Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

feat!: update to latest libp2p interfaces #74

Merged
merged 2 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
# libp2p-pubsub <!-- omit in toc -->
# @libp2p/pubsub <!-- omit in toc -->

[![test & maybe release](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml/badge.svg)](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml)
[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![IRC](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io)
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-pubsub.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-pubsub)
[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml)

> Contains an implementation of the Pubsub interface
> libp2p pubsub base class

## Table of contents <!-- omit in toc -->

- [Install](#install)
- [Usage](#usage)
- [License](#license)
- [Contribution](#contribution)
- [Contribution](#contribution)

## Install

```console
$ npm i @libp2p/pubsub
```

## Usage

Expand All @@ -28,9 +39,9 @@ class MyPubsubImplementation extends PubSubBaseProtocol {

Licensed under either of

* Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / http://www.apache.org/licenses/LICENSE-2.0)
* MIT ([LICENSE-MIT](LICENSE-MIT) / http://opensource.org/licenses/MIT)
- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT ([LICENSE-MIT](LICENSE-MIT) / <http://opensource.org/licenses/MIT>)

### Contribution
## Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
37 changes: 21 additions & 16 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@
],
"exports": {
".": {
"import": "./dist/src/index.js",
"types": "./dist/src/index.d.ts"
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./errors": {
"import": "./dist/src/errors.js",
"types": "./dist/src/errors.d.ts"
"types": "./dist/src/errors.d.ts",
"import": "./dist/src/errors.js"
},
"./peer-streams": {
"import": "./dist/src/peer-streams.js",
"types": "./dist/src/peer-streams.d.ts"
"types": "./dist/src/peer-streams.d.ts",
"import": "./dist/src/peer-streams.js"
},
"./signature-policy": {
"import": "./dist/src/signature-policy.js",
"types": "./dist/src/signature-policy.d.ts"
"types": "./dist/src/signature-policy.d.ts",
"import": "./dist/src/signature-policy.js"
},
"./utils": {
"import": "./dist/src/utils.js",
"types": "./dist/src/utils.d.ts"
"types": "./dist/src/utils.d.ts",
"import": "./dist/src/utils.js"
}
},
"eslintConfig": {
Expand Down Expand Up @@ -172,26 +172,31 @@
"release": "aegir release"
},
"dependencies": {
"@libp2p/crypto": "^0.22.8",
"@libp2p/interfaces": "^2.0.0",
"@libp2p/components": "^1.0.0",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^1.1.0",
"@libp2p/peer-collections": "^1.0.0",
"@libp2p/peer-id": "^1.1.0",
"@libp2p/topology": "^1.1.0",
"@multiformats/multiaddr": "^10.1.5",
"@libp2p/topology": "^2.0.0",
"@multiformats/multiaddr": "^10.2.0",
"abortable-iterator": "^4.0.2",
"err-code": "^3.0.1",
"iso-random-stream": "^2.0.0",
"it-length-prefixed": "^7.0.1",
"it-pipe": "^2.0.3",
"it-pushable": "^2.0.1",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.3",
"p-queue": "^7.2.0",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@libp2p/interface-connection": "^1.0.1",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-pubsub": "^1.0.1",
"@libp2p/interface-registrar": "^1.0.0",
"@libp2p/peer-id-factory": "^1.0.0",
"aegir": "^37.0.7",
"aegir": "^37.2.0",
"delay": "^5.0.0",
"it-pair": "^2.0.2",
"p-defer": "^4.0.0",
Expand Down
29 changes: 16 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import {
signMessage,
verifySignature
} from './sign.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
import type { Connection } from '@libp2p/interfaces/connection'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interfaces/pubsub'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { Connection } from '@libp2p/interface-connection'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { Components, Initializable } from '@libp2p/interfaces/components'
import { Components, Initializable } from '@libp2p/components'

const log = logger('libp2p:pubsub')

Expand Down Expand Up @@ -63,7 +63,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
public multicodecs: string[]
public components: Components = new Components()

private _registrarTopologyId: string | undefined
private _registrarTopologyIds: string[] | undefined
protected enabled: boolean

constructor (props: PubSubInit) {
Expand Down Expand Up @@ -112,17 +112,18 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi

log('starting')

const registrar = this.components.getRegistrar()
// Incoming streams
// Called after a peer dials us
await this.components.getRegistrar().handle(this.multicodecs, this._onIncomingStream)
await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream)))

// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = createTopology({
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
})
this._registrarTopologyId = await this.components.getRegistrar().register(this.multicodecs, topology)
this._registrarTopologyIds = await Promise.all(this.multicodecs.map(async multicodec => await registrar.register(multicodec, topology)))

log('started')
this.started = true
Expand All @@ -136,12 +137,14 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
return
}

const registrar = this.components.getRegistrar()

// unregister protocol and handlers
if (this._registrarTopologyId != null) {
this.components.getRegistrar().unregister(this._registrarTopologyId)
if (this._registrarTopologyIds != null) {
this._registrarTopologyIds?.map(id => registrar.unregister(id))
}

await this.components.getRegistrar().unhandle(this.multicodecs)
await Promise.all(this.multicodecs.map(async multicodec => await registrar.unhandle(multicodec)))

log('stopping')
for (const peerStreams of this.peers.values()) {
Expand Down Expand Up @@ -553,7 +556,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
/**
* Get a list of the peer-ids that are subscribed to one topic.
*/
getSubscribers (topic: string) {
getSubscribers (topic: string): PeerId[] {
if (!this.started) {
throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET')
}
Expand Down Expand Up @@ -676,7 +679,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
return Array.from(this.subscriptions)
}

getPeers () {
getPeers (): PeerId[] {
if (!this.started) {
throw new Error('Pubsub is not started')
}
Expand Down
6 changes: 3 additions & 3 deletions src/peer-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import * as lp from 'it-length-prefixed'
import { pushable } from 'it-pushable'
import { pipe } from 'it-pipe'
import { abortableSource } from 'abortable-iterator'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Stream } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Stream } from '@libp2p/interface-connection'
import type { Pushable } from 'it-pushable'
import type { PeerStreamEvents } from '@libp2p/interfaces/pubsub'
import type { PeerStreamEvents } from '@libp2p/interface-pubsub'

const log = logger('libp2p-pubsub:peer-streams')

Expand Down
4 changes: 2 additions & 2 deletions src/sign.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toRpcMessage } from './utils.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PeerId } from '@libp2p/interface-peer-id'
import { keys } from '@libp2p/crypto'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { peerIdFromKeys } from '@libp2p/peer-id'

export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:')
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { randomBytes } from 'iso-random-stream'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { sha256 } from 'multiformats/hashes/sha2'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { codes } from './errors.js'
import errcode from 'err-code'
Expand Down
2 changes: 1 addition & 1 deletion test/emit-self.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from './utils/index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import delay from 'delay'
import { Components } from '@libp2p/interfaces/components'
import { Components } from '@libp2p/components'

const protocol = '/pubsub/1.0.0'
const topic = 'foo'
Expand Down
2 changes: 1 addition & 1 deletion test/instance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'aegir/chai'
import { PubSubBaseProtocol } from '../src/index.js'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'

class PubsubProtocol extends PubSubBaseProtocol {
decodeRpc (bytes: Uint8Array): PubSubRPC {
Expand Down
16 changes: 8 additions & 8 deletions test/lifecycle.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import {
MockRegistrar,
mockIncomingStreamEvent
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Registrar } from '@libp2p/interfaces/registrar'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import { Components } from '@libp2p/interfaces/components'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Registrar } from '@libp2p/interface-registrar'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'

class PubsubProtocol extends PubSubBaseProtocol {
decodeRpc (bytes: Uint8Array): PubSubRPC {
Expand Down Expand Up @@ -158,7 +158,7 @@ describe('pubsub base lifecycle', () => {

// Notify peers of connection
await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))

expect(pubsubA.peers.size).to.be.eql(1)
expect(pubsubB.peers.size).to.be.eql(1)
Expand All @@ -179,7 +179,7 @@ describe('pubsub base lifecycle', () => {
sinon.spy(c0, 'newStream')

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))
expect(c0.newStream).to.have.property('callCount', 1)

// @ts-expect-error _removePeer is a protected method
Expand Down Expand Up @@ -219,7 +219,7 @@ describe('pubsub base lifecycle', () => {
sinon.stub(c0, 'newStream').throws(error)

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))

expect(c0.newStream).to.have.property('callCount', 1)
})
Expand All @@ -237,7 +237,7 @@ describe('pubsub base lifecycle', () => {
const [c0, c1] = ConnectionPair()

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))

// Notice peers of disconnect
topologyA?.onDisconnect(peerIdB)
Expand Down
6 changes: 3 additions & 3 deletions test/message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import {
MockRegistrar,
PubsubImplementation
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Message } from '@libp2p/interfaces/pubsub'
import { Components } from '@libp2p/interfaces/components'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Message } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'

describe('pubsub base messages', () => {
let peerId: PeerId
Expand Down
10 changes: 5 additions & 5 deletions test/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import {
PubsubImplementation,
mockIncomingStreamEvent
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PeerId } from '@libp2p/interface-peer-id'
import { PeerSet } from '@libp2p/peer-collections'
import { Components } from '@libp2p/interfaces/components'
import { Components } from '@libp2p/components'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { noSignMsgId } from '../src/utils.js'
import type { PubSubRPC } from '@libp2p/interfaces/src/pubsub'
import type { PubSubRPC } from '@libp2p/interface-pubsub'
import delay from 'delay'
import pDefer from 'p-defer'

Expand Down Expand Up @@ -149,7 +149,7 @@ describe('pubsub base implementation', () => {
const [c0, c1] = ConnectionPair()

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))
})

afterEach(async () => {
Expand Down Expand Up @@ -259,7 +259,7 @@ describe('pubsub base implementation', () => {
const [c0, c1] = ConnectionPair()

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))
})

afterEach(async () => {
Expand Down
4 changes: 2 additions & 2 deletions test/sign.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import {
import * as PeerIdFactory from '@libp2p/peer-id-factory'
import { randomSeqno, toRpcMessage } from '../src/utils.js'
import { keys } from '@libp2p/crypto'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import type { PeerId } from '@libp2p/interface-peer-id'

function encodeMessage (message: PubSubRPCMessage) {
return RPC.Message.encode(message)
Expand Down
6 changes: 3 additions & 3 deletions test/topic-validators.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import {
MockRegistrar,
PubsubImplementation
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PubSubRPC } from '@libp2p/interfaces/pubsub'
import { Components } from '@libp2p/interfaces/components'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PubSubRPC } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'

const protocol = '/pubsub/1.0.0'

Expand Down
2 changes: 1 addition & 1 deletion test/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'aegir/chai'
import * as utils from '../src/utils.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'

describe('utils', () => {
Expand Down
Loading