diff --git a/package.json b/package.json index e59cf77..d4040e2 100644 --- a/package.json +++ b/package.json @@ -172,7 +172,7 @@ }, "dependencies": { "@achingbrain/ip-address": "^8.1.0", - "@libp2p/interface-connection": "^2.1.0", + "@libp2p/interface-connection": "^3.0.1", "@libp2p/interface-peer-store": "^1.0.0", "@libp2p/logger": "^2.0.0", "@multiformats/multiaddr": "^10.1.1", @@ -180,13 +180,16 @@ "err-code": "^3.0.1", "is-loopback-addr": "^2.0.1", "it-stream-types": "^1.0.4", - "private-ip": "^2.1.1" + "private-ip": "^2.1.1", + "uint8arraylist": "^2.3.2" }, "devDependencies": { "aegir": "^37.2.0", "it-all": "^1.0.6", + "it-map": "^1.0.6", "it-pair": "^2.0.2", "it-pipe": "^2.0.2", + "p-defer": "^4.0.0", "uint8arrays": "^3.0.0" } } diff --git a/src/stream-to-ma-conn.ts b/src/stream-to-ma-conn.ts index fd6f837..cbce121 100644 --- a/src/stream-to-ma-conn.ts +++ b/src/stream-to-ma-conn.ts @@ -3,6 +3,7 @@ import { logger } from '@libp2p/logger' import type { Multiaddr } from '@multiformats/multiaddr' import type { MultiaddrConnection } from '@libp2p/interface-connection' import type { Duplex } from 'it-stream-types' +import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:stream:converter') @@ -29,7 +30,7 @@ interface StreamOptions { } interface StreamProperties { - stream: Duplex + stream: Duplex remoteAddr: Multiaddr localAddr: Multiaddr } @@ -41,6 +42,13 @@ interface StreamProperties { export function streamToMaConnection (props: StreamProperties, options: StreamOptions = {}) { const { stream, remoteAddr } = props const { sink, source } = stream + + const mapSource = (async function * () { + for await (const list of source) { + yield * list + } + }()) + const maConn: MultiaddrConnection = { async sink (source) { if (options.signal != null) { @@ -60,7 +68,7 @@ export function streamToMaConnection (props: StreamProperties, options: StreamOp } } }, - source: (options.signal != null) ? abortableSource(source, options.signal) : source, + source: (options.signal != null) ? abortableSource(mapSource, options.signal) : mapSource, remoteAddr, /** @type {Timeline} */ timeline: { open: Date.now(), close: undefined }, diff --git a/test/fixtures/pair.ts b/test/fixtures/pair.ts new file mode 100644 index 0000000..bac8554 --- /dev/null +++ b/test/fixtures/pair.ts @@ -0,0 +1,28 @@ +import defer from 'p-defer' +import map from 'it-map' +import type { Source, Duplex } from 'it-stream-types' +import { Uint8ArrayList } from 'uint8arraylist' + +/** + * A pair of streams where one drains from the other + */ +export function pair (): Duplex { + const deferred = defer>() + let piped = false + + return { + sink: async source => { + if (piped) { + throw new Error('already piped') + } + + piped = true + deferred.resolve(source) + }, + source: (async function * () { + const source = await deferred.promise + + yield * map(source, (buf) => buf instanceof Uint8Array ? new Uint8ArrayList(buf) : buf) + }()) + } +} diff --git a/test/stream-to-ma-conn.spec.ts b/test/stream-to-ma-conn.spec.ts index 7f6510f..0e9d52c 100644 --- a/test/stream-to-ma-conn.spec.ts +++ b/test/stream-to-ma-conn.spec.ts @@ -1,7 +1,7 @@ /* eslint-env mocha */ import { expect } from 'aegir/chai' -import { pair } from 'it-pair' +import { pair } from './fixtures/pair.js' import { pipe } from 'it-pipe' import { multiaddr } from '@multiformats/multiaddr' import { streamToMaConnection } from '../src/stream-to-ma-conn.js' @@ -9,8 +9,9 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import all from 'it-all' import type { Stream } from '@libp2p/interface-connection' import type { Duplex } from 'it-stream-types' +import type { Uint8ArrayList } from 'uint8arraylist' -function toMuxedStream (stream: Duplex) { +function toMuxedStream (stream: Duplex) { const muxedStream: Stream = { ...stream, close: () => {}, @@ -36,7 +37,7 @@ describe('Convert stream into a multiaddr connection', () => { const remoteAddr = multiaddr('/ip4/100.46.74.201/tcp/6002') it('converts a stream and adds the provided metadata', async () => { - const stream = pair() + const stream = pair() const maConn = streamToMaConnection({ stream: toMuxedStream(stream), @@ -57,7 +58,7 @@ describe('Convert stream into a multiaddr connection', () => { }) it('can stream data over the multiaddr connection', async () => { - const stream = pair() + const stream = pair() const maConn = streamToMaConnection({ stream: toMuxedStream(stream), localAddr,