Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat: add peer store/records, and streams are just streams (#160)
Browse files Browse the repository at this point in the history
The peer store is used by other libp2p components, so split it out
from libp2p so those components can break their dependencies on
libp2p and still run their tests.

Also makes `MuxedStream`s `Stream`s as the muxing should be invisible
to the caller.
  • Loading branch information
achingbrain authored Feb 9, 2022
1 parent fa5ce71 commit 8860a0c
Show file tree
Hide file tree
Showing 76 changed files with 6,798 additions and 228 deletions.
1 change: 1 addition & 0 deletions packages/libp2p-connection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"err-code": "^3.0.1"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.0.0",
"@libp2p/peer-id-factory": "^1.0.0",
"aegir": "^36.1.3"
}
Expand Down
18 changes: 6 additions & 12 deletions packages/libp2p-connection/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import type { Multiaddr } from '@multiformats/multiaddr'
import errCode from 'err-code'
import { OPEN, CLOSING, CLOSED } from '@libp2p/interfaces/connection/status'
import type { MuxedStream } from '@libp2p/interfaces/stream-muxer'
import type { ConnectionStat, StreamData } from '@libp2p/interfaces/connection'
import type { ConnectionStat, Metadata, ProtocolStream, Stream } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interfaces/peer-id'

const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')

export interface ProtocolStream {
protocol: string
stream: MuxedStream
}

interface ConnectionOptions {
localAddr: Multiaddr
remoteAddr: Multiaddr
localPeer: PeerId
remotePeer: PeerId
newStream: (protocols: string[]) => Promise<ProtocolStream>
close: () => Promise<void>
getStreams: () => MuxedStream[]
getStreams: () => Stream[]
stat: ConnectionStat
}

Expand Down Expand Up @@ -69,11 +63,11 @@ export class Connection {
/**
* Reference to the getStreams function of the muxer
*/
private readonly _getStreams: () => MuxedStream[]
private readonly _getStreams: () => Stream[]
/**
* Connection streams registry
*/
public readonly registry: Map<string, StreamData>
public readonly registry: Map<string, Metadata>
private _closing: boolean

/**
Expand Down Expand Up @@ -149,9 +143,9 @@ export class Connection {
/**
* Add a stream when it is opened to the registry
*/
addStream (muxedStream: MuxedStream, streamData: StreamData) {
addStream (stream: Stream, metadata: Metadata) {
// Add metadata for the stream
this.registry.set(muxedStream.id, streamData)
this.registry.set(stream.id, metadata)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import tests from '../../src/connection/index.js'
import { Connection } from '@libp2p/connection'
import peers from '../../src/utils/peers.js'
import tests from '@libp2p/interface-compliance-tests/connection'
import { Connection } from '../src/index.js'
import peers from '@libp2p/interface-compliance-tests/utils/peers'
import * as PeerIdFactory from '@libp2p/peer-id-factory'
import { Multiaddr } from '@multiformats/multiaddr'
import { pair } from 'it-pair'
import type { MuxedStream } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'

describe('compliance tests', () => {
tests({
Expand All @@ -19,7 +19,7 @@ describe('compliance tests', () => {
PeerIdFactory.createFromJSON(peers[0]),
PeerIdFactory.createFromJSON(peers[1])
])
const openStreams: MuxedStream[] = []
const openStreams: Stream[] = []
let streamId = 0

const connection = new Connection({
Expand All @@ -39,7 +39,7 @@ describe('compliance tests', () => {
},
newStream: async (protocols) => {
const id = `${streamId++}`
const stream: MuxedStream = {
const stream: Stream = {
...pair(),
close: async () => {
await stream.sink(async function * () {}())
Expand Down
4 changes: 2 additions & 2 deletions packages/libp2p-connection/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Connection } from '../src/index.js'
import * as PeerIdFactory from '@libp2p/peer-id-factory'
import { pair } from 'it-pair'
import { Multiaddr } from '@multiformats/multiaddr'
import type { MuxedStream } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'

const peers = [{
id: 'QmNMMAqSxPetRS1cVMmutW5BCN1qQQyEr4u98kUvZjcfEw',
Expand Down Expand Up @@ -55,7 +55,7 @@ describe('connection tests', () => {
},
newStream: async (protocols) => {
const id = `${streamId++}`
const stream: MuxedStream = {
const stream: Stream = {
...pair<Uint8Array>(),
close: async () => await stream.sink(async function * () {}()),
id,
Expand Down
3 changes: 3 additions & 0 deletions packages/libp2p-connection/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
},
{
"path": "../libp2p-peer-id-factory"
},
{
"path": "../libp2p-interface-compliance-tests"
}
]
}
36 changes: 31 additions & 5 deletions packages/libp2p-interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@
"types": "./dist/src/stream-muxer/index.d.ts"
},
"./topology": {
"import": "./dist/src/topology/index.js",
"types": "./dist/src/topology/index.d.ts"
"import": "./dist/src/topology/topology.js",
"types": "./dist/src/topology/topology.d.ts"
},
"./topology/multicodec-toplogy": {
"import": "./dist/src/topology/multicodec-toplogy.js",
"types": "./dist/src/topology/multicodec-toplogy.d.ts"
},
"./transport": {
"import": "./dist/src/transport/index.js",
Expand All @@ -84,6 +88,30 @@
"import": "./dist/src/transport/utils/index.js",
"types": "./dist/src/transport/utils/index.d.ts"
},
"./utils/mock-connection": {
"import": "./dist/src/utils/mock-connection.js",
"types": "./dist/src/utils/mock-connection.d.ts"
},
"./utils/mock-connection-gater": {
"import": "./dist/src/utils/mock-connection-gater.js",
"types": "./dist/src/utils/mock-connection-gater.d.ts"
},
"./utils/mock-multiaddr-connection": {
"import": "./dist/src/utils/mock-multiaddr-connection.js",
"types": "./dist/src/utils/mock-multiaddr-connection.d.ts"
},
"./utils/mock-muxer": {
"import": "./dist/src/utils/mock-muxer.js",
"types": "./dist/src/utils/mock-muxer.d.ts"
},
"./utils/mock-peer-store": {
"import": "./dist/src/utils/mock-peer-store.js",
"types": "./dist/src/utils/mock-peer-store.d.ts"
},
"./utils/mock-upgrader": {
"import": "./dist/src/utils/mock-upgrader.js",
"types": "./dist/src/utils/mock-upgrader.d.ts"
},
"./utils/peers": {
"import": "./dist/src/utils/peers.js",
"types": "./dist/src/utils/peers.d.ts"
Expand Down Expand Up @@ -190,13 +218,11 @@
"test:electron-main": "npm run test -- -t electron-main"
},
"dependencies": {
"@libp2p/connection": "^1.0.0",
"@libp2p/crypto": "^0.22.2",
"@libp2p/interfaces": "^1.0.0",
"@libp2p/peer-id": "^1.0.0",
"@libp2p/peer-id-factory": "^1.0.0",
"@libp2p/pubsub": "^1.0.0",
"@libp2p/topology": "^1.0.0",
"@libp2p/pubsub": "^1.1.0",
"@multiformats/multiaddr": "^10.1.1",
"abortable-iterator": "^4.0.0",
"aegir": "^36.1.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { isValidTick } from '../transport/utils/index.js'
import type { DeferredPromise } from 'p-defer'
import type { TestSetup } from '../index.js'
import type { Muxer, MuxerOptions, MuxedStream } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'
import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer'
import type { Source, Duplex } from 'it-stream-types'

async function drainAndClose (stream: Duplex<Uint8Array>) {
Expand All @@ -22,8 +23,8 @@ export default (common: TestSetup<Muxer, MuxerOptions>) => {
it('Open a stream from the dialer', async () => {
const p = duplexPair<Uint8Array>()
const dialer = await common.setup()
const onStreamPromise: DeferredPromise<MuxedStream> = defer()
const onStreamEndPromise: DeferredPromise<MuxedStream> = defer()
const onStreamPromise: DeferredPromise<Stream> = defer()
const onStreamEndPromise: DeferredPromise<Stream> = defer()

const listener = await common.setup({
onStream: stream => {
Expand Down Expand Up @@ -71,7 +72,7 @@ export default (common: TestSetup<Muxer, MuxerOptions>) => {

it('Open a stream from the listener', async () => {
const p = duplexPair<Uint8Array>()
const onStreamPromise: DeferredPromise<MuxedStream> = defer()
const onStreamPromise: DeferredPromise<Stream> = defer()
const dialer = await common.setup({
onStream: stream => {
onStreamPromise.resolve(stream)
Expand Down Expand Up @@ -99,8 +100,8 @@ export default (common: TestSetup<Muxer, MuxerOptions>) => {

it('Open a stream on both sides', async () => {
const p = duplexPair<Uint8Array>()
const onDialerStreamPromise: DeferredPromise<MuxedStream> = defer()
const onListenerStreamPromise: DeferredPromise<MuxedStream> = defer()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
const dialer = await common.setup({
onStream: stream => {
onDialerStreamPromise.resolve(stream)
Expand Down Expand Up @@ -134,8 +135,8 @@ export default (common: TestSetup<Muxer, MuxerOptions>) => {
it('Open a stream on one side, write, open a stream on the other side', async () => {
const toString = (source: Source<Uint8Array>) => map(source, (u) => uint8ArrayToString(u))
const p = duplexPair<Uint8Array>()
const onDialerStreamPromise: DeferredPromise<MuxedStream> = defer()
const onListenerStreamPromise: DeferredPromise<MuxedStream> = defer()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
const dialer = await common.setup({
onStream: stream => {
onDialerStreamPromise.resolve(stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import drain from 'it-drain'
import { Multiaddr } from '@multiformats/multiaddr'
import { pipe } from 'it-pipe'
import type { Upgrader, MultiaddrConnection } from '@libp2p/interfaces/transport'
import type { Connection, StreamData } from '@libp2p/interfaces/connection'
import type { MuxedStream, Muxer } from '@libp2p/interfaces/stream-muxer'
import type { Connection, Stream, Metadata, ProtocolStream } from '@libp2p/interfaces/connection'
import type { Muxer } from '@libp2p/interfaces/stream-muxer'
import type { Duplex } from 'it-stream-types'

/**
Expand Down Expand Up @@ -46,7 +46,7 @@ export function mockMultiaddrConnection (source: Duplex<Uint8Array>): MultiaddrC

export function mockMuxer (): Muxer {
let streamId = 0
let streams: MuxedStream[] = []
let streams: Stream[] = []
const p = pushable<Uint8Array>()

const muxer: Muxer = {
Expand All @@ -61,7 +61,7 @@ export function mockMuxer (): Muxer {
const echo = pair<Uint8Array>()

const id = `${streamId++}`
const stream: MuxedStream = {
const stream: Stream = {
id,
sink: echo.sink,
source: echo.source,
Expand Down Expand Up @@ -116,7 +116,7 @@ async function createConnection (maConn: MultiaddrConnection, direction: 'inboun
const remotePeerIdStr = remoteAddr.getPeerId()
const remotePeer = remotePeerIdStr != null ? PeerId.fromString(remotePeerIdStr) : await PeerIdFactory.createEd25519PeerId()

const streams: MuxedStream[] = []
const streams: Stream[] = []
let streamId = 0

const registry = new Map()
Expand All @@ -140,13 +140,17 @@ async function createConnection (maConn: MultiaddrConnection, direction: 'inboun
tags: [],
streams,
newStream: async (protocols) => {
if (!Array.isArray(protocols)) {
protocols = [protocols]
}

if (protocols.length === 0) {
throw new Error('protocols must have a length')
}

const id = `${streamId++}`
const stream: MuxedStream = muxer.newStream(id)
const streamData = {
const stream: Stream = muxer.newStream(id)
const streamData: ProtocolStream = {
protocol: protocols[0],
stream
}
Expand All @@ -155,7 +159,7 @@ async function createConnection (maConn: MultiaddrConnection, direction: 'inboun

return streamData
},
addStream: (muxedStream: MuxedStream, streamData: StreamData) => {
addStream: (stream: Stream, metadata: Metadata) => {

},
removeStream: (id: string) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

export function mockConnectionGater () {
return {
denyDialPeer: async () => await Promise.resolve(false),
denyDialMultiaddr: async () => await Promise.resolve(false),
denyInboundConnection: async () => await Promise.resolve(false),
denyOutboundConnection: async () => await Promise.resolve(false),
denyInboundEncryptedConnection: async () => await Promise.resolve(false),
denyOutboundEncryptedConnection: async () => await Promise.resolve(false),
denyInboundUpgradedConnection: async () => await Promise.resolve(false),
denyOutboundUpgradedConnection: async () => await Promise.resolve(false),
filterMultiaddrForPeer: async () => await Promise.resolve(true)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { PeerId } from '@libp2p/peer-id'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { pipe } from 'it-pipe'
import type { MultiaddrConnection } from '@libp2p/interfaces/transport'
import type { Connection, Stream, Metadata, ProtocolStream } from '@libp2p/interfaces/connection'
import type { Muxer } from '@libp2p/interfaces/stream-muxer'

export async function mockConnection (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound', muxer: Muxer): Promise<Connection> {
const remoteAddr = maConn.remoteAddr
const remotePeerIdStr = remoteAddr.getPeerId()
const remotePeer = remotePeerIdStr != null ? PeerId.fromString(remotePeerIdStr) : await createEd25519PeerId()

const streams: Stream[] = []
let streamId = 0

const registry = new Map()

void pipe(
maConn, muxer, maConn
)

return {
id: 'mock-connection',
remoteAddr,
remotePeer,
stat: {
status: 'OPEN',
direction,
timeline: maConn.timeline,
multiplexer: 'test-multiplexer',
encryption: 'yes-yes-very-secure'
},
registry,
tags: [],
streams,
newStream: async (protocols) => {
if (!Array.isArray(protocols)) {
protocols = [protocols]
}

if (protocols.length === 0) {
throw new Error('protocols must have a length')
}

const id = `${streamId++}`
const stream: Stream = muxer.newStream(id)
const streamData: ProtocolStream = {
protocol: protocols[0],
stream
}

registry.set(id, streamData)

return streamData
},
addStream: (stream: Stream, metadata: Metadata) => {

},
removeStream: (id: string) => {
registry.delete(id)
},
close: async () => {
await maConn.close()
}
}
}
Loading

0 comments on commit 8860a0c

Please sign in to comment.