From 62d8ecbc723e693a2544e69172d99c576d187c23 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 3 Dec 2021 10:45:12 +0000 Subject: [PATCH] feat: dht client (#3947) * Enables `libp2p-kad-dht` in client mode * Updates types with new DHT events BREAKING CHANGE: The DHT API has been refactored to return async iterators of query events --- package.json | 2 +- packages/interface-ipfs-core/package.json | 10 +- .../interface-ipfs-core/src/dht/disabled.js | 7 +- .../interface-ipfs-core/src/dht/find-peer.js | 36 +- .../interface-ipfs-core/src/dht/find-provs.js | 60 +--- packages/interface-ipfs-core/src/dht/get.js | 37 +- .../interface-ipfs-core/src/dht/provide.js | 21 +- packages/interface-ipfs-core/src/dht/put.js | 38 +- packages/interface-ipfs-core/src/dht/query.js | 36 +- packages/interface-ipfs-core/src/dht/utils.js | 39 ++ .../src/miscellaneous/id.js | 1 + packages/ipfs-cli/package.json | 8 +- .../ipfs-cli/src/commands/dht/find-peer.js | 9 +- .../src/commands/dht/find-providers.js | 20 +- packages/ipfs-cli/src/commands/dht/get.js | 9 +- packages/ipfs-cli/src/commands/dht/query.js | 15 +- packages/ipfs-cli/test/dht.spec.js | 77 ++-- packages/ipfs-core-config/package.json | 16 +- .../ipfs-core-config/src/config.browser.js | 6 +- packages/ipfs-core-config/src/config.js | 6 +- .../ipfs-core-config/src/libp2p.browser.js | 7 +- packages/ipfs-core-config/src/libp2p.js | 7 +- packages/ipfs-core-types/src/dht/index.ts | 124 +++++-- packages/ipfs-core-types/src/name/index.ts | 2 +- packages/ipfs-core-utils/package.json | 2 +- .../src/with-timeout-option.js | 3 +- packages/ipfs-core/package.json | 28 +- .../ipfs-core/src/components/add-all/index.js | 1 - packages/ipfs-core/src/components/block/rm.js | 2 +- packages/ipfs-core/src/components/dht.js | 243 ++++++++++--- packages/ipfs-core/src/components/id.js | 7 +- packages/ipfs-core/src/components/index.js | 2 +- packages/ipfs-core/src/components/libp2p.js | 6 +- .../ipfs-core/src/components/name/resolve.js | 26 +- .../ipfs-core/src/components/refs/index.js | 3 +- packages/ipfs-core/src/ipns/publisher.js | 48 +-- packages/ipfs-core/src/ipns/routing/config.js | 10 +- .../src/ipns/routing/dht-datastore.js | 43 +++ .../src/ipns/routing/offline-datastore.js | 2 +- .../src/ipns/routing/pubsub-datastore.js | 11 +- packages/ipfs-core/test/libp2p.spec.js | 2 +- packages/ipfs-core/test/name.spec.js | 10 +- packages/ipfs-daemon/package.json | 7 +- packages/ipfs-daemon/src/index.js | 7 +- packages/ipfs-grpc-client/package.json | 2 +- packages/ipfs-grpc-server/package.json | 2 +- .../ipfs-http-client/src/dht/find-peer.js | 17 +- .../ipfs-http-client/src/dht/find-provs.js | 14 +- packages/ipfs-http-client/src/dht/get.js | 16 +- .../ipfs-http-client/src/dht/map-event.js | 119 +++++++ packages/ipfs-http-client/src/dht/provide.js | 16 +- packages/ipfs-http-client/src/dht/put.js | 16 +- packages/ipfs-http-client/src/dht/query.js | 12 +- .../src/dht/response-types.js | 2 +- packages/ipfs-http-gateway/package.json | 2 +- packages/ipfs-http-server/package.json | 8 +- .../ipfs-http-server/src/api/resources/dht.js | 336 ++++++++++++++---- packages/ipfs-http-server/src/index.js | 5 + .../src/utils/stream-response.js | 15 +- packages/ipfs-http-server/test/inject/dht.js | 306 ++++++++-------- packages/ipfs/package.json | 6 +- packages/ipfs/src/cli.js | 6 + packages/ipfs/test/interface-core.js | 6 +- packages/ipfs/test/interface-http-js.js | 6 +- 64 files changed, 1300 insertions(+), 668 deletions(-) create mode 100644 packages/ipfs-core/src/ipns/routing/dht-datastore.js create mode 100644 packages/ipfs-http-client/src/dht/map-event.js diff --git a/package.json b/package.json index 5879b7df84..d5df09cc1d 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "JavaScript implementation of the IPFS specification", "scripts": { "link": "lerna link", - "reset": "lerna run clean && rimraf packages/*/node_modules node_modules", + "reset": "lerna run clean && rimraf packages/*/node_modules node_modules package-lock.json packages/*/package-lock.json", "test": "lerna run test", "test:node": "lerna run test:node", "test:browser": "lerna run test:browser", diff --git a/packages/interface-ipfs-core/package.json b/packages/interface-ipfs-core/package.json index e980ee4b25..90392f4db0 100644 --- a/packages/interface-ipfs-core/package.json +++ b/packages/interface-ipfs-core/package.json @@ -76,9 +76,9 @@ "ipfs-unixfs": "^6.0.3", "ipfs-unixfs-importer": "^9.0.3", "ipfs-utils": "^9.0.2", - "ipns": "^0.15.0", + "ipns": "^0.16.0", "is-ipfs": "^6.0.1", - "iso-random-stream": "^2.0.0", + "iso-random-stream": "^2.0.2", "it-all": "^1.0.4", "it-buffer-stream": "^2.0.0", "it-concat": "^2.0.0", @@ -90,7 +90,7 @@ "it-pushable": "^1.4.2", "it-tar": "^4.0.0", "it-to-buffer": "^2.0.0", - "libp2p-crypto": "^0.19.7", + "libp2p-crypto": "^0.21.0", "libp2p-websockets": "^0.16.2", "multiaddr": "^10.0.0", "multiformats": "^9.4.13", @@ -99,9 +99,9 @@ "p-map": "^4.0.0", "p-retry": "^4.5.0", "pako": "^1.0.2", - "peer-id": "^0.15.1", + "peer-id": "^0.16.0", "readable-stream": "^3.4.0", - "sinon": "^11.1.1", + "sinon": "^12.0.01", "uint8arrays": "^3.0.0" }, "devDependencies": { diff --git a/packages/interface-ipfs-core/src/dht/disabled.js b/packages/interface-ipfs-core/src/dht/disabled.js index f1c074b952..a099d24872 100644 --- a/packages/interface-ipfs-core/src/dht/disabled.js +++ b/packages/interface-ipfs-core/src/dht/disabled.js @@ -2,7 +2,7 @@ import { expect } from 'aegir/utils/chai.js' import { getDescribe, getIt } from '../utils/mocha.js' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import all from 'it-all' /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -42,8 +42,9 @@ export function testDisabled (factory, options) { after(() => factory.clean()) it('should error when DHT not available', async () => { - await expect(nodeA.dht.get(uint8ArrayFromString('/ipns/Qme6KJdKcp85TYbLxuLV7oQzMiLremD7HMoXLZEmgo6Rnh'))) - .to.eventually.be.rejected() + const events = await all(nodeA.dht.get('/ipns/12D3KooWQMSMXmsBvs5YDEQ6tXsaFv9tjuzmDmEvusaiQSFdrJdN')) + + expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty() }) }) } diff --git a/packages/interface-ipfs-core/src/dht/find-peer.js b/packages/interface-ipfs-core/src/dht/find-peer.js index a436b721e0..3101a0348d 100644 --- a/packages/interface-ipfs-core/src/dht/find-peer.js +++ b/packages/interface-ipfs-core/src/dht/find-peer.js @@ -3,6 +3,9 @@ import { expect } from 'aegir/utils/chai.js' import { getDescribe, getIt } from '../utils/mocha.js' import testTimeout from '../utils/test-timeout.js' +import drain from 'it-drain' +import all from 'it-all' +import { ensureReachable } from './utils.js' /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -23,15 +26,12 @@ export function testFindPeer (factory, options) { let nodeA /** @type {import('ipfs-core-types').IPFS} */ let nodeB - /** @type {import('ipfs-core-types/src/root').IDResult} */ - let nodeBId before(async () => { nodeA = (await factory.spawn()).api nodeB = (await factory.spawn()).api - nodeBId = await nodeB.id() - await nodeA.swarm.connect(nodeBId.addresses[0]) + await ensureReachable(nodeA, nodeB) }) after(() => factory.clean()) @@ -39,25 +39,37 @@ export function testFindPeer (factory, options) { it('should respect timeout option when finding a peer on the DHT', async () => { const nodeBId = await nodeB.id() - await testTimeout(() => nodeA.dht.findPeer(nodeBId.id, { + await testTimeout(() => drain(nodeA.dht.findPeer(nodeBId.id, { timeout: 1 - })) + }))) }) it('should find other peers', async () => { const nodeBId = await nodeB.id() - const res = await nodeA.dht.findPeer(nodeBId.id) - const id = res.id.toString() + const results = await all(nodeA.dht.findPeer(nodeBId.id)) + const finalPeer = results.filter(event => event.name === 'FINAL_PEER').pop() + + if (!finalPeer || finalPeer.name !== 'FINAL_PEER') { + throw new Error('No finalPeer event received') + } + + const id = finalPeer.peer.id const nodeAddresses = nodeBId.addresses.map((addr) => addr.nodeAddress()) - const peerAddresses = res.addrs.map(ma => ma.nodeAddress()) + const peerAddresses = finalPeer.peer.multiaddrs.map(ma => ma.nodeAddress()) - expect(id).to.be.eql(nodeBId.id) + expect(id).to.equal(nodeBId.id) expect(peerAddresses).to.deep.include(nodeAddresses[0]) }) - it('should fail to find other peer if peer does not exist', () => { - return expect(nodeA.dht.findPeer('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ')).to.eventually.be.rejected() + it('should fail to find other peer if peer does not exist', async () => { + const events = await all(nodeA.dht.findPeer('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ')) + + // no finalPeer events found + expect(events.filter(event => event.name === 'FINAL_PEER')).to.be.empty() + + // queryError events found + expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty() }) }) } diff --git a/packages/interface-ipfs-core/src/dht/find-provs.js b/packages/interface-ipfs-core/src/dht/find-provs.js index cd848400af..7554c7e98e 100644 --- a/packages/interface-ipfs-core/src/dht/find-provs.js +++ b/packages/interface-ipfs-core/src/dht/find-provs.js @@ -4,8 +4,8 @@ import { expect } from 'aegir/utils/chai.js' import { getDescribe, getIt } from '../utils/mocha.js' import all from 'it-all' import drain from 'it-drain' -import { fakeCid } from './utils.js' import testTimeout from '../utils/test-timeout.js' +import { ensureReachable } from './utils.js' /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -20,7 +20,7 @@ export function testFindProvs (factory, options) { const it = getIt(options) describe('.dht.findProvs', function () { - this.timeout(20000) + this.timeout(80 * 1000) /** @type {import('ipfs-core-types').IPFS} */ let nodeA @@ -28,26 +28,14 @@ export function testFindProvs (factory, options) { let nodeB /** @type {import('ipfs-core-types').IPFS} */ let nodeC - /** @type {import('ipfs-core-types/src/root').IDResult} */ - let nodeAId - /** @type {import('ipfs-core-types/src/root').IDResult} */ - let nodeBId - /** @type {import('ipfs-core-types/src/root').IDResult} */ - let nodeCId before(async () => { nodeA = (await factory.spawn()).api nodeB = (await factory.spawn()).api nodeC = (await factory.spawn()).api - nodeAId = await nodeA.id() - nodeBId = await nodeB.id() - nodeCId = await nodeC.id() - - await Promise.all([ - nodeB.swarm.connect(nodeAId.addresses[0]), - nodeC.swarm.connect(nodeBId.addresses[0]) - ]) + await ensureReachable(nodeB, nodeA) + await ensureReachable(nodeC, nodeB) }) after(() => factory.clean()) @@ -57,8 +45,6 @@ export function testFindProvs (factory, options) { */ let providedCid before('add providers for the same cid', async function () { - this.timeout(10 * 1000) - const cids = await Promise.all([ nodeB.object.new('unixfs-dir'), nodeC.object.new('unixfs-dir') @@ -79,38 +65,20 @@ export function testFindProvs (factory, options) { }) it('should be able to find providers', async function () { - // @ts-ignore this is mocha - this.timeout(20 * 1000) - - const provs = await all(nodeA.dht.findProvs(providedCid, { numProviders: 2 })) - const providerIds = provs.map((p) => p.id.toString()) + /** @type {string[]} */ + const providerIds = [] - expect(providerIds).to.have.members([ - nodeBId.id, - nodeCId.id - ]) - }) - - it('should take options to override timeout config', async function () { - const options = { - timeout: 1 + for await (const event of nodeA.dht.findProvs(providedCid)) { + if (event.name === 'PROVIDER') { + providerIds.push(...event.providers.map(prov => prov.id)) + } } - const cidV0 = await fakeCid() - const start = Date.now() - let res - - try { - res = await all(nodeA.dht.findProvs(cidV0, options)) - } catch (/** @type {any} */ err) { - // rejected by http client - expect(err).to.have.property('name', 'TimeoutError') - return - } + const nodeBId = await nodeB.id() + const nodeCId = await nodeC.id() - // rejected by the server, errors don't work over http - https://github.com/ipfs/js-ipfs/issues/2519 - expect(res).to.be.an('array').with.lengthOf(0) - expect(Date.now() - start).to.be.lessThan(100) + expect(providerIds).to.include(nodeBId.id) + expect(providerIds).to.include(nodeCId.id) }) }) } diff --git a/packages/interface-ipfs-core/src/dht/get.js b/packages/interface-ipfs-core/src/dht/get.js index c6b20daa6a..0232d5f79c 100644 --- a/packages/interface-ipfs-core/src/dht/get.js +++ b/packages/interface-ipfs-core/src/dht/get.js @@ -3,8 +3,10 @@ import { expect } from 'aegir/utils/chai.js' import { getDescribe, getIt } from '../utils/mocha.js' import testTimeout from '../utils/test-timeout.js' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import drain from 'it-drain' +import all from 'it-all' +import { ensureReachable } from './utils.js' /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -19,19 +21,18 @@ export function testGet (factory, options) { const it = getIt(options) describe('.dht.get', function () { + this.timeout(80 * 1000) + /** @type {import('ipfs-core-types').IPFS} */ let nodeA /** @type {import('ipfs-core-types').IPFS} */ let nodeB - /** @type {import('ipfs-core-types/src/root').IDResult} */ - let nodeBId before(async () => { nodeA = (await factory.spawn()).api nodeB = (await factory.spawn()).api - nodeBId = await nodeB.id() - await nodeA.swarm.connect(nodeBId.addresses[0]) + await ensureReachable(nodeA, nodeB) }) after(() => factory.clean()) @@ -40,23 +41,33 @@ export function testGet (factory, options) { const data = await nodeA.add('should put a value to the DHT') const publish = await nodeA.name.publish(data.cid) - await testTimeout(() => nodeB.dht.get(uint8ArrayFromString(`/ipns/${publish.name}`), { + await testTimeout(() => drain(nodeB.dht.get(`/ipns/${publish.name}`, { timeout: 1 - })) + }))) }) - it('should error when getting a non-existent key from the DHT', () => { - return expect(nodeA.dht.get(uint8ArrayFromString('non-existing'), { timeout: 100 })) - .to.eventually.be.rejected - .and.be.an.instanceOf(Error) + it('should error when getting a non-existent key from the DHT', async () => { + const key = '/ipns/k51qzi5uqu5dl0dbfddy2wb42nvbc6anyxnkrguy5l0h0bv9kaih6j6vqdskqk' + const events = await all(nodeA.dht.get(key)) + + // no value events found + expect(events.filter(event => event.name === 'VALUE')).to.be.empty() + + // queryError events found + expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty() }) it('should get a value after it was put on another node', async () => { const data = await nodeA.add('should put a value to the DHT') const publish = await nodeA.name.publish(data.cid) - const record = await nodeA.dht.get(uint8ArrayFromString(`/ipns/${publish.name}`)) + const events = await all(nodeA.dht.get(`/ipns/${publish.name}`)) + const valueEvent = events.filter(event => event.name === 'VALUE').pop() + + if (!valueEvent || valueEvent.name !== 'VALUE') { + throw new Error('Value event not found') + } - expect(uint8ArrayToString(record)).to.contain(data.cid.toString()) + expect(uint8ArrayToString(valueEvent.value)).to.contain(data.cid.toString()) }) }) } diff --git a/packages/interface-ipfs-core/src/dht/provide.js b/packages/interface-ipfs-core/src/dht/provide.js index 9280398e6d..ea4a6456b7 100644 --- a/packages/interface-ipfs-core/src/dht/provide.js +++ b/packages/interface-ipfs-core/src/dht/provide.js @@ -5,6 +5,7 @@ import { CID } from 'multiformats/cid' import all from 'it-all' import { expect } from 'aegir/utils/chai.js' import { getDescribe, getIt } from '../utils/mocha.js' +import { ensureReachable } from './utils.js' /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -27,8 +28,8 @@ export function testProvide (factory, options) { before(async () => { ipfs = (await factory.spawn()).api const nodeB = (await factory.spawn()).api - const nodeBId = await nodeB.id() - await ipfs.swarm.connect(nodeBId.addresses[0]) + + await ensureReachable(ipfs, nodeB) }) after(() => factory.clean()) @@ -48,28 +49,14 @@ export function testProvide (factory, options) { .that.include('not found locally') }) - it('should allow multiple CIDs to be passed', async () => { - const res = await all(ipfs.addAll([ - { content: uint8ArrayFromString('t0') }, - { content: uint8ArrayFromString('t1') } - ])) - - await all(ipfs.dht.provide(res.map(f => f.cid))) - }) - it('should provide a CIDv1', async () => { const res = await ipfs.add(uint8ArrayFromString('test'), { cidVersion: 1 }) await all(ipfs.dht.provide(res.cid)) }) - it('should error on non CID arg', () => { + it('should error on non CID arg', async () => { // @ts-expect-error invalid arg return expect(all(ipfs.dht.provide({}))).to.eventually.be.rejected() }) - - it('should error on array containing non CID arg', () => { - // @ts-expect-error invalid arg - return expect(all(ipfs.dht.provide([{}]))).to.eventually.be.rejected() - }) }) } diff --git a/packages/interface-ipfs-core/src/dht/put.js b/packages/interface-ipfs-core/src/dht/put.js index ef42678f1b..c66cf6a204 100644 --- a/packages/interface-ipfs-core/src/dht/put.js +++ b/packages/interface-ipfs-core/src/dht/put.js @@ -3,7 +3,7 @@ import { expect } from 'aegir/utils/chai.js' import { getDescribe, getIt } from '../utils/mocha.js' import all from 'it-all' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { ensureReachable } from './utils.js' /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -18,31 +18,49 @@ export function testPut (factory, options) { const it = getIt(options) describe('.dht.put', function () { + this.timeout(80 * 1000) + /** @type {import('ipfs-core-types').IPFS} */ let nodeA /** @type {import('ipfs-core-types').IPFS} */ let nodeB - /** @type {import('ipfs-core-types/src/root').IDResult} */ - let nodeBId before(async () => { nodeA = (await factory.spawn()).api nodeB = (await factory.spawn()).api - nodeBId = await nodeB.id() - await nodeA.swarm.connect(nodeBId.addresses[0]) + await ensureReachable(nodeA, nodeB) }) after(() => factory.clean()) it('should put a value to the DHT', async function () { const { cid } = await nodeA.add('should put a value to the DHT') + const publish = await nodeA.name.publish(cid) - const record = await nodeA.dht.get(uint8ArrayFromString(`/ipns/${publish.name}`)) - const value = await all(nodeA.dht.put(uint8ArrayFromString(`/ipns/${publish.name}`), record, { verbose: true })) - expect(value).to.has.length(3) - expect(value[2].id.toString()).to.be.equal(nodeBId.id) - expect(value[2].type).to.be.equal(5) + let record + + for await (const event of nodeA.dht.get(`/ipns/${publish.name}`)) { + if (event.name === 'VALUE') { + record = event.value + break + } + } + + if (!record) { + throw new Error('Could not find value') + } + + const events = await all(nodeA.dht.put(`/ipns/${publish.name}`, record, { verbose: true })) + const peerResponse = events.filter(event => event.name === 'PEER_RESPONSE').pop() + + if (!peerResponse || peerResponse.name !== 'PEER_RESPONSE') { + throw new Error('Did not get peer response') + } + + const nodeBId = await nodeB.id() + + expect(peerResponse.from).to.be.equal(nodeBId.id) }) }) } diff --git a/packages/interface-ipfs-core/src/dht/query.js b/packages/interface-ipfs-core/src/dht/query.js index ba184da66d..103f18cacc 100644 --- a/packages/interface-ipfs-core/src/dht/query.js +++ b/packages/interface-ipfs-core/src/dht/query.js @@ -2,9 +2,9 @@ import { expect } from 'aegir/utils/chai.js' import { getDescribe, getIt } from '../utils/mocha.js' -import all from 'it-all' import drain from 'it-drain' import testTimeout from '../utils/test-timeout.js' +import { ensureReachable } from './utils.js' /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -25,44 +25,36 @@ export function testQuery (factory, options) { let nodeA /** @type {import('ipfs-core-types').IPFS} */ let nodeB - /** @type {import('ipfs-core-types/src/root').IDResult} */ - let nodeBId before(async () => { nodeA = (await factory.spawn()).api nodeB = (await factory.spawn()).api - const nodeAId = await nodeA.id() - nodeBId = await nodeB.id() - await nodeB.swarm.connect(nodeAId.addresses[0]) + + await ensureReachable(nodeA, nodeB) }) after(() => factory.clean()) - it('should respect timeout option when querying the DHT', () => { + it('should respect timeout option when querying the DHT', async () => { + const nodeBId = await nodeB.id() + return testTimeout(() => drain(nodeA.dht.query(nodeBId.id, { timeout: 1 }))) }) it('should return the other node in the query', async function () { - const timeout = 150 * 1000 - // @ts-ignore this is mocha - this.timeout(timeout) + /** @type {string[]} */ + const peers = [] + const nodeBId = await nodeB.id() - try { - const peers = await all(nodeA.dht.query(nodeBId.id, { timeout: timeout - 1000 })) - expect(peers.map(p => p.id.toString())).to.include(nodeBId.id) - } catch (/** @type {any} */ err) { - if (err.name === 'TimeoutError') { - // This test is meh. DHT works best with >= 20 nodes. Therefore a - // failure might happen, but we don't want to report it as such. - // Hence skip the test before the timeout is reached - // @ts-ignore this is mocha - this.skip() - } else { - throw err + for await (const event of nodeA.dht.query(nodeBId.id)) { + if (event.name === 'PEER_RESPONSE') { + peers.push(...event.closer.map(data => data.id)) } } + + expect(peers).to.include(nodeBId.id) }) }) } diff --git a/packages/interface-ipfs-core/src/dht/utils.js b/packages/interface-ipfs-core/src/dht/utils.js index 0a9b28bad9..ddff9e9884 100644 --- a/packages/interface-ipfs-core/src/dht/utils.js +++ b/packages/interface-ipfs-core/src/dht/utils.js @@ -2,6 +2,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { CID } from 'multiformats/cid' import { sha256 } from 'multiformats/hashes/sha2' +import delay from 'delay' /** * @param {Uint8Array} [data] @@ -12,3 +13,41 @@ export async function fakeCid (data) { const mh = await sha256.digest(bytes) return CID.createV0(mh) } + +/** + * @param {import('ipfs-core-types').IPFS} nodeA + * @param {import('ipfs-core-types').IPFS} nodeB + */ +export async function ensureReachable (nodeA, nodeB) { + /** + * @param {import('ipfs-core-types').IPFS} source + * @param {import('ipfs-core-types').IPFS} target + */ + async function canFindOnDHT (source, target) { + const { id } = await target.id() + + for await (const event of source.dht.query(id)) { + if (event.name === 'PEER_RESPONSE' && event.from === id) { + return + } + } + + throw new Error(`Could not find ${id} in DHT`) + } + + const nodeBId = await nodeB.id() + await nodeA.swarm.connect(nodeBId.addresses[0]) + + while (true) { + try { + await Promise.all([ + canFindOnDHT(nodeA, nodeB), + canFindOnDHT(nodeB, nodeA) + ]) + + break + } catch { + await delay(1000) + } + } +} diff --git a/packages/interface-ipfs-core/src/miscellaneous/id.js b/packages/interface-ipfs-core/src/miscellaneous/id.js index 9cd753ea86..83b5f5ed69 100644 --- a/packages/interface-ipfs-core/src/miscellaneous/id.js +++ b/packages/interface-ipfs-core/src/miscellaneous/id.js @@ -55,6 +55,7 @@ export function testId (factory, options) { '/ipfs/bitswap/1.2.0', '/ipfs/id/1.0.0', '/ipfs/id/push/1.0.0', + '/ipfs/lan/kad/1.0.0', '/ipfs/ping/1.0.0', '/libp2p/circuit/relay/0.1.0', '/meshsub/1.0.0', diff --git a/packages/ipfs-cli/package.json b/packages/ipfs-cli/package.json index 62931f433d..b6af9b801a 100644 --- a/packages/ipfs-cli/package.json +++ b/packages/ipfs-cli/package.json @@ -70,7 +70,7 @@ "ipfs-core-utils": "^0.12.2", "ipfs-daemon": "^0.10.4", "ipfs-http-client": "^54.0.2", - "ipfs-repo": "^13.0.4", + "ipfs-repo": "^13.0.6", "ipfs-utils": "^9.0.2", "it-all": "^1.0.4", "it-concat": "^2.0.0", @@ -82,7 +82,7 @@ "it-split": "^1.0.0", "it-tar": "^4.0.0", "jsondiffpatch": "^0.4.1", - "libp2p-crypto": "^0.19.7", + "libp2p-crypto": "^0.21.0", "mafmt": "^10.0.0", "multiaddr": "^10.0.0", "multiaddr-to-uri": "^8.0.0", @@ -103,9 +103,9 @@ "nanoid": "^3.1.23", "ncp": "^2.0.0", "pako": "^2.0.4", - "peer-id": "^0.15.1", + "peer-id": "^0.16.0", "rimraf": "^3.0.2", - "sinon": "^11.1.1", + "sinon": "^12.0.01", "string-argv": "^0.3.1", "temp-write": "^4.0.0" } diff --git a/packages/ipfs-cli/src/commands/dht/find-peer.js b/packages/ipfs-cli/src/commands/dht/find-peer.js index f0b91aada7..880bbcad91 100644 --- a/packages/ipfs-cli/src/commands/dht/find-peer.js +++ b/packages/ipfs-cli/src/commands/dht/find-peer.js @@ -22,9 +22,12 @@ export default { * @param {number} argv.timeout */ async handler ({ ctx: { ipfs, print }, peerId, timeout }) { - const peer = await ipfs.dht.findPeer(peerId, { + for await (const event of ipfs.dht.findPeer(peerId, { timeout - }) - peer.addrs.forEach(addr => print(`${addr}`)) + })) { + if (event.name === 'FINAL_PEER') { + event.peer.multiaddrs.forEach(addr => print(`${addr}`)) + } + } } } diff --git a/packages/ipfs-cli/src/commands/dht/find-providers.js b/packages/ipfs-cli/src/commands/dht/find-providers.js index 70a57d592a..3409f5a6b3 100644 --- a/packages/ipfs-cli/src/commands/dht/find-providers.js +++ b/packages/ipfs-cli/src/commands/dht/find-providers.js @@ -31,11 +31,25 @@ export default { * @param {number} argv.timeout */ async handler ({ ctx: { ipfs, print }, key, numProviders, timeout }) { - for await (const prov of ipfs.dht.findProvs(key, { - numProviders, + const providers = new Set() + + for await (const event of ipfs.dht.findProvs(key, { timeout })) { - print(prov.id.toString()) + if (event.name === 'PROVIDER') { + event.providers.forEach(peerData => { + if (providers.has(peerData.id)) { + return + } + + providers.add(peerData.id) + print(peerData.id.toString()) + }) + + if (providers.size >= numProviders) { + break + } + } } } } diff --git a/packages/ipfs-cli/src/commands/dht/get.js b/packages/ipfs-cli/src/commands/dht/get.js index 221c0c2d59..58f957f1ec 100644 --- a/packages/ipfs-cli/src/commands/dht/get.js +++ b/packages/ipfs-cli/src/commands/dht/get.js @@ -25,9 +25,12 @@ export default { * @param {number} argv.timeout */ async handler ({ ctx: { ipfs, print }, key, timeout }) { - const value = await ipfs.dht.get(key.bytes, { + for await (const event of await ipfs.dht.get(key.bytes, { timeout - }) - print(uint8ArrayToString(value, 'base58btc')) + })) { + if (event.name === 'VALUE') { + print(uint8ArrayToString(event.value, 'base58btc')) + } + } } } diff --git a/packages/ipfs-cli/src/commands/dht/query.js b/packages/ipfs-cli/src/commands/dht/query.js index 06c7a64919..b09fb5845d 100644 --- a/packages/ipfs-cli/src/commands/dht/query.js +++ b/packages/ipfs-cli/src/commands/dht/query.js @@ -22,10 +22,21 @@ export default { * @param {number} argv.timeout */ async handler ({ ctx: { ipfs, print }, peerId, timeout }) { - for await (const result of ipfs.dht.query(peerId, { + const seen = new Set() + + for await (const event of ipfs.dht.query(peerId, { timeout })) { - print(result.id.toString()) + if (event.name === 'PEER_RESPONSE') { + event.closer.forEach(peerData => { + if (seen.has(peerData.id)) { + return + } + + print(peerData.id) + seen.add(peerData.id) + }) + } } } } diff --git a/packages/ipfs-cli/test/dht.spec.js b/packages/ipfs-cli/test/dht.spec.js index 631c1ee2d5..e1a4b69984 100644 --- a/packages/ipfs-cli/test/dht.spec.js +++ b/packages/ipfs-cli/test/dht.spec.js @@ -61,7 +61,10 @@ describe('dht', () => { const key = CID.parse('QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp') const value = uint8ArrayFromString('testvalue') - ipfs.dht.get.withArgs(key.bytes, defaultOptions).resolves(value) + ipfs.dht.get.withArgs(key.bytes, defaultOptions).returns([{ + name: 'VALUE', + value + }]) const out = await cli(`dht get ${key}`, { ipfs @@ -76,7 +79,10 @@ describe('dht', () => { ipfs.dht.get.withArgs(key.bytes, { ...defaultOptions, timeout: 1000 - }).resolves(value) + }).returns([{ + name: 'VALUE', + value + }]) const out = await cli(`dht get ${key} --timeout=1s`, { ipfs @@ -139,7 +145,6 @@ describe('dht', () => { describe('findprovs', () => { const defaultOptions = { - numProviders: 20, timeout: undefined } const key = CID.parse('QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp') @@ -148,9 +153,12 @@ describe('dht', () => { } it('should be able to find providers for data', async () => { - ipfs.dht.findProvs.withArgs(key, defaultOptions).returns([ - prov - ]) + ipfs.dht.findProvs.withArgs(key, defaultOptions).returns([{ + name: 'PROVIDER', + providers: [ + prov + ] + }]) const out = await cli(`dht findprovs ${key}`, { ipfs }) expect(out).to.equal(`${prov.id}\n`) @@ -158,11 +166,13 @@ describe('dht', () => { it('should be able to find smaller number of providers for data', async () => { ipfs.dht.findProvs.withArgs(key, { - ...defaultOptions, - numProviders: 5 - }).returns([ - prov - ]) + ...defaultOptions + }).returns([{ + name: 'PROVIDER', + providers: [ + prov + ] + }]) const out = await cli(`dht findprovs ${key} --num-providers 5`, { ipfs }) expect(out).to.equal(`${prov.id}\n`) @@ -172,9 +182,12 @@ describe('dht', () => { ipfs.dht.findProvs.withArgs(key, { ...defaultOptions, timeout: 1000 - }).returns([ - prov - ]) + }).returns([{ + name: 'PROVIDER', + providers: [ + prov + ] + }]) const out = await cli(`dht findprovs ${key} --timeout=1s`, { ipfs }) expect(out).to.equal(`${prov.id}\n`) @@ -187,27 +200,33 @@ describe('dht', () => { } const peerId = 'QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp' const peer = { - addrs: [ + multiaddrs: [ 'addr' ] } it('should find a peer', async () => { - ipfs.dht.findPeer.withArgs(peerId, defaultOptions).returns(peer) + ipfs.dht.findPeer.withArgs(peerId, defaultOptions).returns([{ + name: 'FINAL_PEER', + peer + }]) const out = await cli(`dht findpeer ${peerId}`, { ipfs }) - expect(out).to.equal(`${peer.addrs[0]}\n`) + expect(out).to.equal(`${peer.multiaddrs[0]}\n`) }) it('should find a peer with a timeout', async () => { ipfs.dht.findPeer.withArgs(peerId.toString(), { ...defaultOptions, timeout: 1000 - }).returns(peer) + }).returns([{ + name: 'FINAL_PEER', + peer + }]) const out = await cli(`dht findpeer ${peerId} --timeout=1s`, { ipfs }) - expect(out).to.equal(`${peer.addrs[0]}\n`) + expect(out).to.equal(`${peer.multiaddrs[0]}\n`) }) }) @@ -221,23 +240,27 @@ describe('dht', () => { } it('should query the DHT', async () => { - // https://github.com/libp2p/js-peer-id/issues/141 - ipfs.dht.query.withArgs(peerId, defaultOptions).returns([ - peer - ]) + ipfs.dht.query.withArgs(peerId, defaultOptions).returns([{ + name: 'PEER_RESPONSE', + closer: [ + peer + ] + }]) const out = await cli(`dht query ${peerId}`, { ipfs }) expect(out).to.equal(`${peer.id}\n`) }) it('should query the DHT with a timeout', async () => { - // https://github.com/libp2p/js-peer-id/issues/141 ipfs.dht.query.withArgs(peerId, { ...defaultOptions, timeout: 1000 - }).returns([ - peer - ]) + }).returns([{ + name: 'PEER_RESPONSE', + closer: [ + peer + ] + }]) const out = await cli(`dht query ${peerId} --timeout=1s`, { ipfs }) expect(out).to.equal(`${peer.id}\n`) diff --git a/packages/ipfs-core-config/package.json b/packages/ipfs-core-config/package.json index 461a427270..5defa96790 100644 --- a/packages/ipfs-core-config/package.json +++ b/packages/ipfs-core-config/package.json @@ -79,7 +79,7 @@ }, "license": "MIT", "dependencies": { - "@chainsafe/libp2p-noise": "^4.0.0", + "@achingbrain/libp2p-noise": "^5.0.0", "blockstore-datastore-adapter": "^2.0.2", "datastore-core": "^6.0.7", "datastore-fs": "^6.0.1", @@ -87,19 +87,19 @@ "debug": "^4.1.1", "err-code": "^3.0.1", "hashlru": "^2.3.0", - "ipfs-repo": "^13.0.4", + "ipfs-repo": "^13.0.6", "ipfs-utils": "^9.0.2", - "ipns": "^0.15.0", + "ipns": "^0.16.0", "is-ipfs": "^6.0.1", "it-all": "^1.0.4", "it-drain": "^1.0.3", - "libp2p-floodsub": "^0.27.0", - "libp2p-gossipsub": "^0.11.1", - "libp2p-kad-dht": "^0.25.0", - "libp2p-mdns": "^0.17.0", + "libp2p-floodsub": "^0.28.0", + "libp2p-gossipsub": "^0.12.0", + "libp2p-kad-dht": "^0.27.0", + "libp2p-mdns": "^0.18.0", "libp2p-mplex": "^0.10.2", "libp2p-tcp": "^0.17.1", - "libp2p-webrtc-star": "^0.24.0", + "libp2p-webrtc-star": "^0.25.0", "libp2p-websockets": "^0.16.2", "p-queue": "^6.6.1", "uint8arrays": "^3.0.0" diff --git a/packages/ipfs-core-config/src/config.browser.js b/packages/ipfs-core-config/src/config.browser.js index 90fcb07ea9..b3225f64ad 100644 --- a/packages/ipfs-core-config/src/config.browser.js +++ b/packages/ipfs-core-config/src/config.browser.js @@ -40,12 +40,12 @@ export default () => ({ }, Swarm: { ConnMgr: { - LowWater: 200, - HighWater: 500 + LowWater: 5, + HighWater: 20 }, DisableNatPortMap: true }, Routing: { - Type: 'none' + Type: 'dhtclient' } }) diff --git a/packages/ipfs-core-config/src/config.js b/packages/ipfs-core-config/src/config.js index 309cc2155c..3185b6c67a 100644 --- a/packages/ipfs-core-config/src/config.js +++ b/packages/ipfs-core-config/src/config.js @@ -45,12 +45,12 @@ export default () => ({ }, Swarm: { ConnMgr: { - LowWater: 200, - HighWater: 500 + LowWater: 50, + HighWater: 200 }, DisableNatPortMap: false }, Routing: { - Type: 'none' + Type: 'dhtclient' } }) diff --git a/packages/ipfs-core-config/src/libp2p.browser.js b/packages/ipfs-core-config/src/libp2p.browser.js index 5874205bca..4c6e7e2df9 100644 --- a/packages/ipfs-core-config/src/libp2p.browser.js +++ b/packages/ipfs-core-config/src/libp2p.browser.js @@ -5,7 +5,7 @@ import WS from 'libp2p-websockets' import WebRTCStar from 'libp2p-webrtc-star' // @ts-expect-error - no types import Multiplex from 'libp2p-mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { NOISE } from '@achingbrain/libp2p-noise' import KadDHT from 'libp2p-kad-dht' import GossipSub from 'libp2p-gossipsub' import { validator, selector } from './utils/ipns.js' @@ -47,11 +47,8 @@ export function libp2pConfig () { }, dht: { kBucketSize: 20, - enabled: false, + enabled: true, clientMode: true, - randomWalk: { - enabled: false - }, validators: { ipns: validator }, diff --git a/packages/ipfs-core-config/src/libp2p.js b/packages/ipfs-core-config/src/libp2p.js index 68b78cfd70..73160d83e6 100644 --- a/packages/ipfs-core-config/src/libp2p.js +++ b/packages/ipfs-core-config/src/libp2p.js @@ -7,7 +7,7 @@ import KadDHT from 'libp2p-kad-dht' import GossipSub from 'libp2p-gossipsub' // @ts-expect-error - no types import Multiplex from 'libp2p-mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { NOISE } from '@achingbrain/libp2p-noise' import { validator, selector } from './utils/ipns.js' import os from 'os' @@ -51,11 +51,8 @@ export function libp2pConfig () { }, dht: { kBucketSize: 20, - enabled: false, + enabled: true, clientMode: true, - randomWalk: { - enabled: false - }, validators: { ipns: validator }, diff --git a/packages/ipfs-core-types/src/dht/index.ts b/packages/ipfs-core-types/src/dht/index.ts index 1452bb95c1..209dfc048b 100644 --- a/packages/ipfs-core-types/src/dht/index.ts +++ b/packages/ipfs-core-types/src/dht/index.ts @@ -21,7 +21,7 @@ export interface API { * // '/ip4/147.75.94.115/tcp/4001' * ``` */ - findPeer: (peerId: string, options?: AbortOptions & OptionExtension) => Promise + findPeer: (peerId: string, options?: AbortOptions & OptionExtension) => AsyncIterable /** * Find peers in the DHT that can provide a specific value, given a CID. @@ -34,17 +34,17 @@ export interface API { * } * ``` */ - findProvs: (cid: CID, options?: DHTFindProvsOptions & OptionExtension) => AsyncIterable + findProvs: (cid: CID, options?: AbortOptions & OptionExtension) => AsyncIterable /** * Given a key, query the DHT for its best value. */ - get: (key: Uint8Array, options?: AbortOptions & OptionExtension) => Promise + get: (key: string | Uint8Array, options?: AbortOptions & OptionExtension) => AsyncIterable /** * Announce to the network that we are providing given values. */ - provide: (cid: CID | CID[], options?: DHTProvideOptions & OptionExtension) => AsyncIterable + provide: (cid: CID, options?: DHTProvideOptions & OptionExtension) => AsyncIterable /** * Write a key/value pair to the DHT. @@ -52,14 +52,13 @@ export interface API { * Given a key of the form /foo/bar and a value of any * form, this will write that value to the DHT with * that key. - * */ - put: (key: Uint8Array, value: Uint8Array, options?: AbortOptions & OptionExtension) => AsyncIterable + put: (key: string | Uint8Array, value: Uint8Array, options?: AbortOptions & OptionExtension) => AsyncIterable /** - * Find the closest peers to a given `PeerId`, by querying the DHT. + * Find the closest peers to a given `PeerId` or `CID`, by querying the DHT. */ - query: (peerId: string, options?: AbortOptions & OptionExtension) => AsyncIterable + query: (peerId: string | CID, options?: AbortOptions & OptionExtension) => AsyncIterable } export interface PeerResult { @@ -67,28 +66,101 @@ export interface PeerResult { addrs: Multiaddr[] } -export interface DHTFindProvsOptions extends AbortOptions { - numProviders?: number -} - export interface DHTProvideOptions extends AbortOptions { recursive?: boolean } -export enum QueryEventType { - SendingQuery = 1, - PeerResponse, - FinalPeer, - QueryError, - Provider, - Value, - AddingPeer, - DialingPeer +export interface PeerData { + id: string + multiaddrs: Multiaddr[] } -export interface DHTQueryMessage { - extra: string - id: string - responses: PeerResult[] - type: QueryEventType +export enum EventTypes { + SENDING_QUERY = 0, + PEER_RESPONSE, + FINAL_PEER, + QUERY_ERROR, + PROVIDER, + VALUE, + ADDING_PEER, + DIALING_PEER +} + +/** + * The types of messages set/received during DHT queries + */ +export enum MessageType { + PUT_VALUE = 0, + GET_VALUE, + ADD_PROVIDER, + GET_PROVIDERS, + FIND_NODE, + PING +} + +export type MessageName = keyof typeof MessageType + +export interface DHTRecord { + key: Uint8Array + value: Uint8Array + timeReceived?: Date +} + +export interface SendingQueryEvent { + to: string + type: EventTypes.SENDING_QUERY + name: 'SENDING_QUERY' +} + +export interface PeerResponseEvent { + from: string + type: EventTypes.PEER_RESPONSE + name: 'PEER_RESPONSE' + messageType: MessageType + messageName: MessageName + providers: PeerData[] + closer: PeerData[] + record?: DHTRecord } + +export interface FinalPeerEvent { + from: string + peer: PeerData + type: EventTypes.FINAL_PEER + name: 'FINAL_PEER' +} + +export interface QueryErrorEvent { + from: string + type: EventTypes.QUERY_ERROR + name: 'QUERY_ERROR' + error: Error +} + +export interface ProviderEvent { + from: string + type: EventTypes.PROVIDER + name: 'PROVIDER' + providers: PeerData[] +} + +export interface ValueEvent { + from: string + type: EventTypes.VALUE + name: 'VALUE' + value: Uint8Array +} + +export interface AddingPeerEvent { + type: EventTypes.ADDING_PEER + name: 'ADDING_PEER' + peer: string +} + +export interface DialingPeerEvent { + peer: string + type: EventTypes.DIALING_PEER + name: 'DIALING_PEER' +} + +export type QueryEvent = SendingQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddingPeerEvent | DialingPeerEvent diff --git a/packages/ipfs-core-types/src/name/index.ts b/packages/ipfs-core-types/src/name/index.ts index 495c35f325..a3f83287c9 100644 --- a/packages/ipfs-core-types/src/name/index.ts +++ b/packages/ipfs-core-types/src/name/index.ts @@ -27,7 +27,7 @@ export interface API { * * @example * ```js - * // The IPNS address you want to resolve. + * // The IPNS address you want to resolve * const addr = '/ipns/ipfs.io' * * for await (const name of ipfs.name.resolve(addr)) { diff --git a/packages/ipfs-core-utils/package.json b/packages/ipfs-core-utils/package.json index dbd9517792..52af5ed14c 100644 --- a/packages/ipfs-core-utils/package.json +++ b/packages/ipfs-core-utils/package.json @@ -130,7 +130,7 @@ "multiformats": "^9.4.13", "nanoid": "^3.1.23", "parse-duration": "^1.0.0", - "timeout-abort-controller": "^1.1.1", + "timeout-abort-controller": "^2.0.0", "uint8arrays": "^3.0.0" }, "devDependencies": { diff --git a/packages/ipfs-core-utils/src/with-timeout-option.js b/packages/ipfs-core-utils/src/with-timeout-option.js index f0e4c5fa51..550ccdcda7 100644 --- a/packages/ipfs-core-utils/src/with-timeout-option.js +++ b/packages/ipfs-core-utils/src/with-timeout-option.js @@ -1,7 +1,6 @@ /* eslint-disable no-unreachable */ -// @ts-expect-error no types -import TimeoutController from 'timeout-abort-controller' +import { TimeoutController } from 'timeout-abort-controller' import { anySignal } from 'any-signal' import parseDuration from 'parse-duration' import { TimeoutError } from './errors.js' diff --git a/packages/ipfs-core/package.json b/packages/ipfs-core/package.json index 0c2bec7bf7..a92287e20f 100644 --- a/packages/ipfs-core/package.json +++ b/packages/ipfs-core/package.json @@ -65,7 +65,7 @@ "dep-check": "aegir dep-check -i interface-ipfs-core -i ipfs-core-types -i abort-controller -i npm-run-all --i interface-blockstore" }, "dependencies": { - "@chainsafe/libp2p-noise": "^4.0.0", + "@achingbrain/libp2p-noise": "^5.0.0", "@ipld/car": "^3.1.0", "@ipld/dag-cbor": "^6.0.5", "@ipld/dag-pb": "^2.1.3", @@ -83,17 +83,17 @@ "hashlru": "^2.3.0", "interface-blockstore": "^2.0.2", "interface-datastore": "^6.0.2", - "ipfs-bitswap": "^7.0.1", + "ipfs-bitswap": "^9.0.0", "ipfs-core-config": "^0.1.4", "ipfs-core-types": "^0.8.4", "ipfs-core-utils": "^0.12.2", "ipfs-http-client": "^54.0.2", - "ipfs-repo": "^13.0.4", + "ipfs-repo": "^13.0.6", "ipfs-unixfs": "^6.0.3", "ipfs-unixfs-exporter": "^7.0.3", "ipfs-unixfs-importer": "^9.0.3", "ipfs-utils": "^9.0.2", - "ipns": "^0.15.0", + "ipns": "^0.16.0", "is-domain-name": "^1.0.1", "is-ipfs": "^6.0.1", "it-all": "^1.0.4", @@ -103,18 +103,18 @@ "it-last": "^1.0.4", "it-map": "^1.0.4", "it-merge": "^1.0.2", - "it-parallel": "^1.0.0", + "it-parallel": "^2.0.1", "it-peekable": "^1.0.2", "it-pipe": "^1.1.0", "it-pushable": "^1.4.2", "it-tar": "^4.0.0", "it-to-buffer": "^2.0.0", "just-safe-set": "^2.2.1", - "libp2p": "^0.33.0", - "libp2p-bootstrap": "^0.13.0", - "libp2p-crypto": "^0.19.7", - "libp2p-delegated-content-routing": "^0.11.0", - "libp2p-delegated-peer-routing": "^0.10.0", + "libp2p": "^0.35.0", + "libp2p-bootstrap": "^0.14.0", + "libp2p-crypto": "^0.21.0", + "libp2p-delegated-content-routing": "^0.11.1", + "libp2p-delegated-peer-routing": "^0.11.0", "libp2p-record": "^0.10.3", "mafmt": "^10.0.0", "merge-options": "^3.0.4", @@ -125,8 +125,8 @@ "native-abort-controller": "^1.0.3", "pako": "^1.0.2", "parse-duration": "^1.0.0", - "peer-id": "^0.15.1", - "timeout-abort-controller": "^1.1.1", + "peer-id": "^0.16.0", + "timeout-abort-controller": "^2.0.0", "uint8arrays": "^3.0.0" }, "devDependencies": { @@ -140,12 +140,12 @@ "interface-ipfs-core": "^0.152.2", "ipfsd-ctl": "^10.0.4", "iso-url": "^1.0.0", - "libp2p-gossipsub": "^0.11.1", + "libp2p-gossipsub": "^0.12.0", "nanoid": "^3.1.23", "npm-run-all": "^4.1.5", "p-defer": "^3.0.0", "rimraf": "^3.0.2", - "sinon": "^11.1.1" + "sinon": "^12.0.01" }, "gitHead": "" } diff --git a/packages/ipfs-core/src/components/add-all/index.js b/packages/ipfs-core/src/components/add-all/index.js index f69c6f54e9..c6635dc704 100644 --- a/packages/ipfs-core/src/components/add-all/index.js +++ b/packages/ipfs-core/src/components/add-all/index.js @@ -16,7 +16,6 @@ const mergeOptions = mergeOpts.bind({ ignoreUndefined: true }) * @property {import('ipfs-repo').IPFSRepo} repo * @property {import('../../types').Preload} preload * @property {import('ipfs-core-types/src/root').ShardingOptions} [options] - * * @param {Context} context */ export function createAddAll ({ repo, preload, options }) { diff --git a/packages/ipfs-core/src/components/block/rm.js b/packages/ipfs-core/src/components/block/rm.js index d34774e26e..55ef03d66f 100644 --- a/packages/ipfs-core/src/components/block/rm.js +++ b/packages/ipfs-core/src/components/block/rm.js @@ -53,7 +53,7 @@ export function createRm ({ repo }) { return result } }), - source => parallel(source, BLOCK_RM_CONCURRENCY), + source => parallel(source, { concurrency: BLOCK_RM_CONCURRENCY }), source => filter(source, () => !options.quiet) ) } finally { diff --git a/packages/ipfs-core/src/components/dht.js b/packages/ipfs-core/src/components/dht.js index 1f47f6d17e..b0b27ab78d 100644 --- a/packages/ipfs-core/src/components/dht.js +++ b/packages/ipfs-core/src/components/dht.js @@ -3,72 +3,213 @@ import errCode from 'err-code' import { NotEnabledError } from '../errors.js' import get from 'dlv' import { withTimeoutOption } from 'ipfs-core-utils/with-timeout-option' +import map from 'it-map' +import { CID } from 'multiformats/cid' +import { base58btc } from 'multiformats/bases/base58' +import { base36 } from 'multiformats/bases/base36' +import { concat as uint8ArrayConcat } from 'uint8arrays/concat' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' + +/** + * @typedef {import('libp2p-kad-dht').QueryEvent} DHTQueryEvent + * @typedef {import('ipfs-core-types/src/dht').QueryEvent} QueryEvent + */ + +const IPNS_PREFIX = '/ipns/' + +/** + * @param {string} str + */ +function toDHTKey (str) { + if (str.startsWith(IPNS_PREFIX)) { + str = str.substring(IPNS_PREFIX.length) + } + + /** @type {Uint8Array|undefined} */ + let buf + + if (str[0] === '1' || str[0] === 'Q') { + // ed25519 key or hash of rsa key + str = `z${str}` + } + + if (str[0] === 'z') { + buf = base58btc.decode(str) + } + + if (str[0] === 'k') { + // base36 encoded string + buf = base36.decode(str) + } + + if (!buf) { + throw new Error('Could not parse string') + } + + if (buf[0] !== 0x01 && buf[1] !== 0x72) { + // prefix key with CIDv1 and libp2p-key codec + buf = uint8ArrayConcat([ + [0x01, 0x72], + buf + ]) + } + + if (buf.length !== 40) { + throw new Error('Incorrect length ' + buf.length) + } + + return uint8ArrayConcat([ + uint8ArrayFromString(IPNS_PREFIX), + buf.subarray(2) + ]) +} + +/** + * @param {DHTQueryEvent} event + * @returns {QueryEvent} + */ +function mapEvent (event) { + if (event.name === 'SENDING_QUERY') { + return { + type: event.type, + name: event.name, + to: event.to.toB58String() + } + } + + if (event.name === 'PEER_RESPONSE') { + return { + type: event.type, + name: event.name, + messageType: event.messageType, + messageName: event.messageName, + closer: event.closer.map(({ id, multiaddrs }) => ({ id: id.toB58String(), multiaddrs })), + providers: event.providers.map(({ id, multiaddrs }) => ({ id: id.toB58String(), multiaddrs })), + record: event.record, + from: event.from.toB58String() + } + } + + if (event.name === 'FINAL_PEER') { + return { + type: event.type, + name: event.name, + from: event.from.toB58String(), + peer: { + id: event.peer.id.toB58String(), + multiaddrs: event.peer.multiaddrs + } + } + } + + if (event.name === 'QUERY_ERROR') { + return { + type: event.type, + name: event.name, + error: event.error, + from: event.from.toB58String() + } + } + + if (event.name === 'PROVIDER') { + return { + type: event.type, + name: event.name, + providers: event.providers.map(({ id, multiaddrs }) => ({ id: id.toB58String(), multiaddrs })), + from: event.from.toB58String() + } + } + + if (event.name === 'VALUE') { + return { + type: event.type, + name: event.name, + value: event.value, + from: event.from.toB58String() + } + } + + if (event.name === 'ADDING_PEER') { + return { + type: event.type, + name: event.name, + peer: event.peer.toB58String() + } + } + + if (event.name === 'DIALING_PEER') { + return { + type: event.type, + name: event.name, + peer: event.peer.toB58String() + } + } + + throw errCode(new Error('Unknown DHT event type'), 'ERR_UNKNOWN_DHT_EVENT') +} /** * @param {Object} config * @param {import('../types').NetworkService} config.network * @param {import('ipfs-repo').IPFSRepo} config.repo + * @param {PeerId} config.peerId */ -export function createDht ({ network, repo }) { +export function createDht ({ network, repo, peerId }) { const { get, put, findProvs, findPeer, provide, query } = { /** * @type {import('ipfs-core-types/src/dht').API["get"]} */ - async get (key, options = {}) { - const { libp2p } = await use(network, options) - return libp2p._dht.get(key, options) + async * get (key, options = {}) { + const { libp2p } = await use(network, peerId, options) + + const dhtKey = key instanceof Uint8Array ? key : toDHTKey(key) + + yield * map(libp2p._dht.get(dhtKey, options), mapEvent) }, /** * @type {import('ipfs-core-types/src/dht').API["put"]} */ async * put (key, value, options) { - const { libp2p } = await use(network, options) - yield * libp2p._dht.put(key, value) + const { libp2p } = await use(network, peerId, options) + + const dhtKey = key instanceof Uint8Array ? key : toDHTKey(key) + + yield * map(libp2p._dht.put(dhtKey, value), mapEvent) }, /** * @type {import('ipfs-core-types/src/dht').API["findProvs"]} */ async * findProvs (cid, options = { numProviders: 20 }) { - const { libp2p } = await use(network, options) + const { libp2p } = await use(network, peerId, options) - for await (const peer of libp2p._dht.findProviders(cid, { - maxNumProviders: options.numProviders, + yield * map(libp2p._dht.findProviders(cid, { signal: options.signal - })) { - yield { - id: peer.id.toB58String(), - addrs: peer.addrs - } - } + }), mapEvent) }, /** * @type {import('ipfs-core-types/src/dht').API["findPeer"]} */ - async findPeer (peerId, options) { - const { libp2p } = await use(network, options) - const peer = await libp2p._dht.findPeer(PeerId.parse(peerId)) + async * findPeer (peerIdToFind, options = {}) { + const { libp2p } = await use(network, peerId, options) - return { - id: peer.id.toB58String(), - addrs: peer.multiaddrs - } + yield * map(libp2p._dht.findPeer(PeerId.parse(peerIdToFind), { + signal: options.signal + }), mapEvent) }, /** * @type {import('ipfs-core-types/src/dht').API["provide"]} */ - async * provide (cids, options = { recursive: false }) { - const { libp2p } = await use(network, options) - const cidArr = Array.isArray(cids) ? cids : [cids] + async * provide (cid, options = { recursive: false }) { + const { libp2p } = await use(network, peerId, options) // ensure blocks are actually local - const hasCids = await Promise.all(cidArr.map(cid => repo.blocks.has(cid))) - const hasAll = hasCids.every(has => has) + const hasBlock = await repo.blocks.has(cid) - if (!hasAll) { + if (!hasBlock) { throw errCode(new Error('block(s) not found locally, cannot provide'), 'ERR_BLOCK_NOT_FOUND') } @@ -77,23 +218,24 @@ export function createDht ({ network, repo }) { throw errCode(new Error('not implemented yet'), 'ERR_NOT_IMPLEMENTED_YET') } - for (const cid of cidArr) { - yield libp2p._dht.provide(cid) - } + yield * map(libp2p._dht.provide(cid), mapEvent) }, /** * @type {import('ipfs-core-types/src/dht').API["query"]} */ - async * query (peerId, options) { - const { libp2p } = await use(network, options) + async * query (peerIdToQuery, options = {}) { + const { libp2p } = await use(network, peerId, options) + let bytes + const asCid = CID.asCID(peerIdToQuery) - for await (const closerPeerId of libp2p._dht.getClosestPeers(PeerId.parse(peerId).toBytes())) { - yield { - id: closerPeerId.toB58String(), - addrs: [] // TODO: get addrs? - } + if (asCid != null) { + bytes = asCid.multihash.bytes + } else { + bytes = PeerId.parse(peerIdToQuery.toString()).toBytes() } + + yield * map(libp2p._dht.getClosestPeers(bytes, options), mapEvent) } } @@ -109,13 +251,34 @@ export function createDht ({ network, repo }) { /** * @param {import('../types').NetworkService} network + * @param {PeerId} peerId * @param {import('ipfs-core-types/src/utils').AbortOptions} [options] */ -const use = async (network, options) => { +const use = async (network, peerId, options) => { const net = await network.use(options) if (get(net.libp2p, '_config.dht.enabled', false)) { return net } else { - throw new NotEnabledError('dht not enabled') + const fn = async function * () { + yield { + from: peerId, + name: 'QUERY_ERROR', + type: 3, + error: new NotEnabledError('dht not enabled') + } + } + + return { + libp2p: { + _dht: { + get: fn, + put: fn, + findProvs: fn, + findPeer: fn, + provide: fn, + query: fn + } + } + } } } diff --git a/packages/ipfs-core/src/components/id.js b/packages/ipfs-core/src/components/id.js index af3ae4681e..163c00e805 100644 --- a/packages/ipfs-core/src/components/id.js +++ b/packages/ipfs-core/src/components/id.js @@ -41,7 +41,12 @@ export function createId ({ peerId, network }) { const id = options.peerId ? PeerId.createFromB58String(options.peerId.toString()) : peerId const { libp2p } = net - const publicKey = options.peerId ? libp2p.peerStore.keyBook.get(id) : id.pubKey + let publicKey = id.pubKey ? id.pubKey : libp2p.peerStore.keyBook.get(id) + + if (!publicKey) { + publicKey = await libp2p._dht.getPublicKey(id, options) + } + const addresses = options.peerId ? libp2p.peerStore.addressBook.getMultiaddrsForPeer(id) : libp2p.multiaddrs const protocols = options.peerId ? libp2p.peerStore.protoBook.get(id) : Array.from(libp2p.upgrader.protocols.keys()) const agentVersion = uint8ArrayToString(libp2p.peerStore.metadataBook.getValue(id, 'AgentVersion') || new Uint8Array()) diff --git a/packages/ipfs-core/src/components/index.js b/packages/ipfs-core/src/components/index.js index f815be6dde..d81bacb6ea 100644 --- a/packages/ipfs-core/src/components/index.js +++ b/packages/ipfs-core/src/components/index.js @@ -167,7 +167,7 @@ class IPFS { repo }) - this.dht = createDht({ network, repo }) + this.dht = createDht({ network, repo, peerId }) this.pubsub = createPubsub({ network, config: options.config }) this.dns = dns this.isOnline = isOnline diff --git a/packages/ipfs-core/src/components/libp2p.js b/packages/ipfs-core/src/components/libp2p.js index 44171a7852..29cc2030cb 100644 --- a/packages/ipfs-core/src/components/libp2p.js +++ b/packages/ipfs-core/src/components/libp2p.js @@ -12,6 +12,7 @@ import { ipfsCore as pkgversion } from '../version.js' import { libp2pConfig as getEnvLibp2pOptions } from 'ipfs-core-config/libp2p' import bootstrap from 'libp2p-bootstrap' import Libp2p from 'libp2p' +import * as ipns from 'ipns' const mergeOptions = mergeOpts.bind({ ignoreUndefined: true }) @@ -129,7 +130,10 @@ function getLibp2pOptions ({ options, config, datastore, keys, keychainConfig, p dht: { enabled: get(config, 'Routing.Type', 'none') !== 'none', clientMode: get(config, 'Routing.Type', 'dht') !== 'dhtserver', - kBucketSize: get(options, 'dht.kBucketSize', 20) + kBucketSize: get(options, 'dht.kBucketSize', 20), + validators: { + ipns: ipns.validator + } }, pubsub: { enabled: get(options, 'config.Pubsub.Enabled', get(config, 'Pubsub.Enabled', true)) diff --git a/packages/ipfs-core/src/components/name/resolve.js b/packages/ipfs-core/src/components/name/resolve.js index 9f7aff7f1b..5482cc92dc 100644 --- a/packages/ipfs-core/src/components/name/resolve.js +++ b/packages/ipfs-core/src/components/name/resolve.js @@ -2,6 +2,8 @@ import debug from 'debug' import errcode from 'err-code' import mergeOpts from 'merge-options' import { CID } from 'multiformats/cid' +import * as Digest from 'multiformats/hashes/digest' +import { base36 } from 'multiformats/bases/base36' import PeerId from 'peer-id' // @ts-expect-error no types import isDomain from 'is-domain-name' @@ -50,6 +52,11 @@ export function createResolve ({ dns, ipns, peerId, isOnline, options: { offline throw errcode(new Error('cannot specify both offline and nocache'), 'ERR_NOCACHE_AND_OFFLINE') } + // IPNS resolve needs a online daemon + if (!isOnline() && !offline) { + throw errcode(new Error(OFFLINE_ERROR), 'OFFLINE_ERROR') + } + // Set node id as name for being resolved, if it is not received if (!name) { name = peerId.toB58String() @@ -59,12 +66,20 @@ export function createResolve ({ dns, ipns, peerId, isOnline, options: { offline name = `/ipns/${name}` } - const [namespace, hash, ...remainder] = name.slice(1).split('/') + let [namespace, hash, ...remainder] = name.slice(1).split('/') + try { if (hash.substring(0, 1) === '1') { - PeerId.parse(hash) + const id = PeerId.parse(hash) + const digest = Digest.decode(id.toBytes()) + const libp2pKey = CID.createV1(0x72, digest) + hash = libp2pKey.toString(base36) } else { - CID.parse(hash) + const cid = CID.parse(hash) + + if (cid.version === 1) { + hash = cid.toString(base36) + } } } catch (/** @type {any} */ err) { // lets check if we have a domain ex. /ipns/ipfs.io and resolve with dns @@ -78,11 +93,6 @@ export function createResolve ({ dns, ipns, peerId, isOnline, options: { offline } // multihash is valid lets resolve with IPNS - // IPNS resolve needs a online daemon - if (!isOnline() && !offline) { - throw errcode(new Error(OFFLINE_ERROR), 'OFFLINE_ERROR') - } - // TODO: convert ipns.resolve to return an iterator const value = await ipns.resolve(`/${namespace}/${hash}`, options) yield appendRemainder(value instanceof Uint8Array ? uint8ArrayToString(value) : value, remainder) diff --git a/packages/ipfs-core/src/components/refs/index.js b/packages/ipfs-core/src/components/refs/index.js index 6348808335..a1560a9477 100644 --- a/packages/ipfs-core/src/components/refs/index.js +++ b/packages/ipfs-core/src/components/refs/index.js @@ -2,8 +2,7 @@ import * as dagPB from '@ipld/dag-pb' import { notFoundError } from 'datastore-core/errors' import { toCidAndPath } from 'ipfs-core-utils/to-cid-and-path' import { CID } from 'multiformats/cid' -// @ts-expect-error no types -import TimeoutController from 'timeout-abort-controller' +import { TimeoutController } from 'timeout-abort-controller' import { anySignal } from 'any-signal' const ERR_NOT_FOUND = notFoundError().code diff --git a/packages/ipfs-core/src/ipns/publisher.js b/packages/ipfs-core/src/ipns/publisher.js index 9e2a07cc98..26c752a8d5 100644 --- a/packages/ipfs-core/src/ipns/publisher.js +++ b/packages/ipfs-core/src/ipns/publisher.js @@ -71,18 +71,12 @@ export class IpnsPublisher { throw errcode(new Error(errMsg), 'ERR_INVALID_PEER_ID') } - // @ts-ignore - accessing private property isn't allowed - const publicKey = peerId._pubKey + const publicKey = peerId.pubKey const embedPublicKeyRecord = await ipns.embedPublicKey(publicKey, record) const keys = ipns.getIdKeys(peerId.toBytes()) await this._publishEntry(keys.routingKey, embedPublicKeyRecord || record) - // Publish the public key to support old go-ipfs nodes that are looking for it in the routing - // We will be able to deprecate this part in the future, since the public keys will be only - // in IPNS record and the peerId. - await this._publishPublicKey(keys.routingPubKey, publicKey) - return embedPublicKeyRecord || record } @@ -114,47 +108,11 @@ export class IpnsPublisher { // Add record to routing (buffer key) try { const res = await this._routing.put(k.uint8Array(), entryData) - log(`ipns record for ${uint8ArrayToString(k.uint8Array(), 'base64')} was stored in the routing`) - - return res - } catch (/** @type {any} */err) { - const errMsg = `ipns record for ${uint8ArrayToString(k.uint8Array(), 'base64')} could not be stored in the routing` - log.error(errMsg) - log.error(err) - - throw errcode(new Error(errMsg), 'ERR_PUTTING_TO_ROUTING') - } - } - - /** - * @param {Key} key - * @param {PublicKey} publicKey - */ - async _publishPublicKey (key, publicKey) { - const k = Key.asKey(key) - - if (!k) { - const errMsg = 'datastore key does not have a valid format' - log.error(errMsg) - - throw errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY') - } - - if (!publicKey || !publicKey.bytes) { - const errMsg = 'one or more of the provided parameters are not defined' - log.error(errMsg) - - throw errcode(new Error(errMsg), 'ERR_UNDEFINED_PARAMETER') - } - - // Add public key to routing (buffer key) - try { - const res = await this._routing.put(k.uint8Array(), publicKey.bytes) - log(`public key for ${uint8ArrayToString(k.uint8Array(), 'base64')} was stored in the routing`) + log(`ipns record for ${uint8ArrayToString(k.uint8Array(), 'base32')} was stored in the routing`) return res } catch (/** @type {any} */err) { - const errMsg = `public key for ${uint8ArrayToString(k.uint8Array(), 'base64')} could not be stored in the routing` + const errMsg = `ipns record for ${uint8ArrayToString(k.uint8Array(), 'base32')} could not be stored in the routing` log.error(errMsg) log.error(err) diff --git a/packages/ipfs-core/src/ipns/routing/config.js b/packages/ipfs-core/src/ipns/routing/config.js index 070d0ab5ee..667501138a 100644 --- a/packages/ipfs-core/src/ipns/routing/config.js +++ b/packages/ipfs-core/src/ipns/routing/config.js @@ -2,6 +2,11 @@ import { TieredDatastore } from 'datastore-core/tiered' import get from 'dlv' import { IpnsPubsubDatastore } from './pubsub-datastore.js' import { OfflineDatastore } from './offline-datastore.js' +import { DHTDatastore } from './dht-datastore.js' + +/** + * @typedef {import('interface-datastore').Datastore} Datastore + */ /** * @param {object} arg @@ -12,6 +17,7 @@ import { OfflineDatastore } from './offline-datastore.js' */ export function createRouting ({ libp2p, repo, peerId, options }) { // Setup online routing for IPNS with a tiered routing composed by a DHT and a Pubsub router (if properly enabled) + /** @type {any[]} */ const ipnsStores = [] // Add IPNS pubsub if enabled @@ -25,11 +31,11 @@ export function createRouting ({ libp2p, repo, peerId, options }) { } // DHT should not be added as routing if we are offline or it is disabled - if (get(options, 'offline') || !get(options, 'libp2p.config.dht.enabled', false)) { + if (get(options, 'offline') || get(options, 'config.Routing.Type', 'none') === 'none') { const offlineDatastore = new OfflineDatastore(repo) ipnsStores.push(offlineDatastore) } else { - ipnsStores.push(libp2p._dht) + ipnsStores.push(new DHTDatastore(libp2p._dht)) } // Create ipns routing with a set of datastores diff --git a/packages/ipfs-core/src/ipns/routing/dht-datastore.js b/packages/ipfs-core/src/ipns/routing/dht-datastore.js new file mode 100644 index 0000000000..b25f48301a --- /dev/null +++ b/packages/ipfs-core/src/ipns/routing/dht-datastore.js @@ -0,0 +1,43 @@ +import drain from 'it-drain' +import { notFoundError } from 'datastore-core/errors' +import debug from 'debug' + +const log = Object.assign(debug('ipfs:ipns:dht-datastore'), { + error: debug('ipfs:ipns:dht-datastore:error') +}) + +export class DHTDatastore { + /** + * + * @param {import('libp2p-kad-dht/src/types').DHT} dht + */ + constructor (dht) { + this._dht = dht + } + + /** + * @param {Uint8Array} key - identifier of the value. + * @param {Uint8Array} value - value to be stored. + */ + async put (key, value) { + try { + await drain(this._dht.put(key, value)) + } catch (/** @type {any} */ err) { + log.error(err) + throw err + } + } + + /** + * @param {Uint8Array} key - identifier of the value to be obtained. + */ + async get (key) { + for await (const event of this._dht.get(key)) { + if (event.name === 'VALUE') { + return event.value + } + } + + throw notFoundError() + } +} diff --git a/packages/ipfs-core/src/ipns/routing/offline-datastore.js b/packages/ipfs-core/src/ipns/routing/offline-datastore.js index 31a39ac31b..817221e026 100644 --- a/packages/ipfs-core/src/ipns/routing/offline-datastore.js +++ b/packages/ipfs-core/src/ipns/routing/offline-datastore.js @@ -89,6 +89,6 @@ export class OfflineDatastore { * @param {Uint8Array} key */ _routingKey (key) { - return new Key('/' + uint8ArrayToString(key, 'base32upper'), false) + return new Key('/dht/record/' + uint8ArrayToString(key, 'base32'), false) } } diff --git a/packages/ipfs-core/src/ipns/routing/pubsub-datastore.js b/packages/ipfs-core/src/ipns/routing/pubsub-datastore.js index 0421cb73db..c82b2b4c14 100644 --- a/packages/ipfs-core/src/ipns/routing/pubsub-datastore.js +++ b/packages/ipfs-core/src/ipns/routing/pubsub-datastore.js @@ -34,9 +34,14 @@ export class IpnsPubsubDatastore { * @param {Uint8Array} key - identifier of the value. * @param {Uint8Array} value - value to be stored. */ - put (key, value) { - // @ts-ignore datastores take Key keys, this one takes Uint8Array keys - return this._pubsubDs.put(key, value) + async put (key, value) { + try { + // @ts-ignore datastores take Key keys, this one takes Uint8Array keys + await this._pubsubDs.put(key, value) + } catch (/** @type {any} */ err) { + log.error(err) + throw err + } } /** diff --git a/packages/ipfs-core/test/libp2p.spec.js b/packages/ipfs-core/test/libp2p.spec.js index 4d93830ddf..8cae6c53bb 100644 --- a/packages/ipfs-core/test/libp2p.spec.js +++ b/packages/ipfs-core/test/libp2p.spec.js @@ -6,7 +6,7 @@ import PeerId from 'peer-id' import Libp2p from 'libp2p' import { EventEmitter } from 'events' import { createLibp2p as libp2pComponent } from '../src/components/libp2p.js' -import { NOISE as Crypto } from '@chainsafe/libp2p-noise' +import { NOISE as Crypto } from '@achingbrain/libp2p-noise' import gossipsub from 'libp2p-gossipsub' /** diff --git a/packages/ipfs-core/test/name.spec.js b/packages/ipfs-core/test/name.spec.js index 0211465158..0b3e52c833 100644 --- a/packages/ipfs-core/test/name.spec.js +++ b/packages/ipfs-core/test/name.spec.js @@ -299,18 +299,16 @@ describe('name', function () { // @ts-expect-error sinon.stub() is not complete implementation peerId: sinon.stub(), options: { - libp2p: { - config: { - dht: { - enabled: true - } + config: { + Routing: { + Type: 'dhtclient' } } } }) expect(config.stores).to.have.lengthOf(1) - expect(config.stores[0]).to.eql(dht) + expect(config.stores).to.have.deep.nested.property('[0]._dht', dht) }) }) }) diff --git a/packages/ipfs-daemon/package.json b/packages/ipfs-daemon/package.json index 6e62aebc25..513b69d054 100644 --- a/packages/ipfs-daemon/package.json +++ b/packages/ipfs-daemon/package.json @@ -52,8 +52,8 @@ "ipfs-http-server": "^0.9.2", "ipfs-utils": "^9.0.2", "just-safe-set": "^2.2.1", - "libp2p": "^0.33.0", - "libp2p-webrtc-star": "^0.24.0" + "libp2p": "^0.35.0", + "libp2p-webrtc-star": "^0.25.0" }, "devDependencies": { "aegir": "^36.0.1", @@ -62,8 +62,7 @@ }, "optionalDependencies": { "electron-webrtc": "^0.3.0", - "prom-client": "^12.0.0", - "prometheus-gc-stats": "^0.6.0", + "prom-client": "^14.0.1", "wrtc": "^0.4.6" } } diff --git a/packages/ipfs-daemon/src/index.js b/packages/ipfs-daemon/src/index.js index f914384baa..fc469b881f 100644 --- a/packages/ipfs-daemon/src/index.js +++ b/packages/ipfs-daemon/src/index.js @@ -8,8 +8,6 @@ import { HttpGateway } from 'ipfs-http-gateway' import { createServer as gRPCServer } from 'ipfs-grpc-server' import { isElectron } from 'ipfs-utils/src/env.js' import prometheusClient from 'prom-client' -// @ts-expect-error - no types -import prometheusGcStats from 'prometheus-gc-stats' import Libp2p from 'libp2p' const log = debug('ipfs:daemon') @@ -23,10 +21,7 @@ export class Daemon { if (process.env.IPFS_MONITORING) { // Setup debug metrics collection - const collectDefaultMetrics = prometheusClient.collectDefaultMetrics - // @ts-ignore - timeout isn't in typedefs - collectDefaultMetrics({ timeout: 5000 }) - prometheusGcStats(prometheusClient.register)() + prometheusClient.collectDefaultMetrics() } /** @type {import('ipfs-core-types').IPFS} */ diff --git a/packages/ipfs-grpc-client/package.json b/packages/ipfs-grpc-client/package.json index 55d92d9acd..4d6371af18 100644 --- a/packages/ipfs-grpc-client/package.json +++ b/packages/ipfs-grpc-client/package.json @@ -71,6 +71,6 @@ "aegir": "^36.0.1", "it-all": "^1.0.4", "rimraf": "^3.0.2", - "sinon": "^11.1.1" + "sinon": "^12.0.01" } } diff --git a/packages/ipfs-grpc-server/package.json b/packages/ipfs-grpc-server/package.json index c9a4671a33..143c60661c 100644 --- a/packages/ipfs-grpc-server/package.json +++ b/packages/ipfs-grpc-server/package.json @@ -66,7 +66,7 @@ "it-all": "^1.0.4", "it-drain": "^1.0.3", "rimraf": "^3.0.2", - "sinon": "^11.1.1", + "sinon": "^12.0.01", "uint8arrays": "^3.0.0" } } diff --git a/packages/ipfs-http-client/src/dht/find-peer.js b/packages/ipfs-http-client/src/dht/find-peer.js index 7077095f6a..22c76b5638 100644 --- a/packages/ipfs-http-client/src/dht/find-peer.js +++ b/packages/ipfs-http-client/src/dht/find-peer.js @@ -1,7 +1,6 @@ -import { Multiaddr } from 'multiaddr' import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' -import { FinalPeer } from './response-types.js' +import { mapEvent } from './map-event.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -12,7 +11,7 @@ export const createFindPeer = configure(api => { /** * @type {DHTAPI["findPeer"]} */ - async function findPeer (peerId, options = {}) { + async function * findPeer (peerId, options = {}) { const res = await api.post('dht/findpeer', { signal: options.signal, searchParams: toUrlSearchParams({ @@ -22,17 +21,9 @@ export const createFindPeer = configure(api => { headers: options.headers }) - for await (const data of res.ndjson()) { - if (data.Type === FinalPeer && data.Responses) { - const { ID, Addrs } = data.Responses[0] - return { - id: ID, - addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a)) - } - } + for await (const event of res.ndjson()) { + yield mapEvent(event) } - - throw new Error('not found') } return findPeer diff --git a/packages/ipfs-http-client/src/dht/find-provs.js b/packages/ipfs-http-client/src/dht/find-provs.js index 915d511ea9..46f0312cde 100644 --- a/packages/ipfs-http-client/src/dht/find-provs.js +++ b/packages/ipfs-http-client/src/dht/find-provs.js @@ -1,7 +1,6 @@ -import { Multiaddr } from 'multiaddr' import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' -import { Provider } from './response-types.js' +import { mapEvent } from './map-event.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -22,15 +21,8 @@ export const createFindProvs = configure(api => { headers: options.headers }) - for await (const message of res.ndjson()) { - if (message.Type === Provider && message.Responses) { - for (const { ID, Addrs } of message.Responses) { - yield { - id: ID, - addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a)) - } - } - } + for await (const event of res.ndjson()) { + yield mapEvent(event) } } diff --git a/packages/ipfs-http-client/src/dht/get.js b/packages/ipfs-http-client/src/dht/get.js index 651978692d..db9dac5938 100644 --- a/packages/ipfs-http-client/src/dht/get.js +++ b/packages/ipfs-http-client/src/dht/get.js @@ -1,7 +1,6 @@ import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' -import { Value } from './response-types.js' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { mapEvent } from './map-event.js' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' /** @@ -13,23 +12,20 @@ export const createGet = configure(api => { /** * @type {DHTAPI["get"]} */ - async function get (key, options = {}) { + async function * get (key, options = {}) { const res = await api.post('dht/get', { signal: options.signal, searchParams: toUrlSearchParams({ - arg: key instanceof Uint8Array ? uint8ArrayToString(key) : key, + // arg: base36.encode(key), + arg: key instanceof Uint8Array ? uint8ArrayToString(key) : key.toString(), ...options }), headers: options.headers }) - for await (const message of res.ndjson()) { - if (message.Type === Value) { - return uint8ArrayFromString(message.Extra, 'base64pad') - } + for await (const event of res.ndjson()) { + yield mapEvent(event) } - - throw new Error('not found') } return get diff --git a/packages/ipfs-http-client/src/dht/map-event.js b/packages/ipfs-http-client/src/dht/map-event.js new file mode 100644 index 0000000000..af641d0421 --- /dev/null +++ b/packages/ipfs-http-client/src/dht/map-event.js @@ -0,0 +1,119 @@ +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { + SendingQuery, + PeerResponse, + FinalPeer, + QueryError, + Provider, + Value, + AddingPeer, + DialingPeer +} from './response-types.js' +import { Multiaddr } from 'multiaddr' + +/** + * @param {{Type: number, ID: string, Extra: string, Responses: {ID: string, Addrs: string[]}[]}} event + * @returns {import('ipfs-core-types/src/dht').QueryEvent} + */ +export const mapEvent = (event) => { + // console.info(JSON.stringify(event, null, 2)) + + if (event.Type === SendingQuery) { + return { + to: event.ID, + name: 'SENDING_QUERY', + type: event.Type + } + } + + if (event.Type === PeerResponse) { + return { + from: event.ID, + name: 'PEER_RESPONSE', + type: event.Type, + // TODO: how to infer this from the go-ipfs response + messageType: 0, + // TODO: how to infer this from the go-ipfs response + messageName: 'PUT_VALUE', + closer: (event.Responses || []).map(({ ID, Addrs }) => ({ id: ID, multiaddrs: Addrs.map(addr => new Multiaddr(addr)) })), + providers: (event.Responses || []).map(({ ID, Addrs }) => ({ id: ID, multiaddrs: Addrs.map(addr => new Multiaddr(addr)) })) + // TODO: how to infer this from the go-ipfs response + // record: ??? + } + } + + if (event.Type === FinalPeer) { + // dht.query ends with a FinalPeer event with no Responses + let peer = { + id: event.ID, + /** @type {Multiaddr[]} */ + multiaddrs: [] + } + + if (event.Responses && event.Responses.length) { + // dht.findPeer has the result in the Responses field + peer = { + id: event.Responses[0].ID, + multiaddrs: event.Responses[0].Addrs.map(addr => new Multiaddr(addr)) + } + } + + return { + from: event.ID, + name: 'FINAL_PEER', + type: event.Type, + peer + } + } + + if (event.Type === QueryError) { + return { + from: event.ID, + name: 'QUERY_ERROR', + type: event.Type, + error: new Error(event.Extra) + } + } + + if (event.Type === Provider) { + return { + from: event.ID, + name: 'PROVIDER', + type: event.Type, + providers: event.Responses.map(({ ID, Addrs }) => ({ id: ID, multiaddrs: Addrs.map(addr => new Multiaddr(addr)) })) + } + } + + if (event.Type === Value) { + return { + from: event.ID, + name: 'VALUE', + type: event.Type, + value: uint8ArrayFromString(event.Extra, 'base64pad') + } + } + + if (event.Type === AddingPeer) { + const peers = event.Responses.map(({ ID }) => ID) + + if (!peers.length) { + throw new Error('No peer found') + } + + return { + name: 'ADDING_PEER', + type: event.Type, + peer: peers[0] + } + } + + if (event.Type === DialingPeer) { + return { + name: 'DIALING_PEER', + type: event.Type, + peer: event.ID + } + } + + throw new Error('Unknown DHT event type') +} diff --git a/packages/ipfs-http-client/src/dht/provide.js b/packages/ipfs-http-client/src/dht/provide.js index 71b23e5036..baeefbb2c6 100644 --- a/packages/ipfs-http-client/src/dht/provide.js +++ b/packages/ipfs-http-client/src/dht/provide.js @@ -1,7 +1,6 @@ -import { Multiaddr } from 'multiaddr' -import { objectToCamel } from '../lib/object-to-camel.js' import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import { mapEvent } from './map-event.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -26,17 +25,8 @@ export const createProvide = configure(api => { headers: options.headers }) - for await (let message of res.ndjson()) { - message = objectToCamel(message) - if (message.responses) { - message.responses = message.responses.map((/** @type {{ ID: string, Addrs: string[] }} */ { ID, Addrs }) => ({ - id: ID, - addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a)) - })) - } else { - message.responses = [] - } - yield message + for await (const event of res.ndjson()) { + yield mapEvent(event) } } diff --git a/packages/ipfs-http-client/src/dht/put.js b/packages/ipfs-http-client/src/dht/put.js index 259a70884a..807899df35 100644 --- a/packages/ipfs-http-client/src/dht/put.js +++ b/packages/ipfs-http-client/src/dht/put.js @@ -1,11 +1,10 @@ -import { Multiaddr } from 'multiaddr' -import { objectToCamel } from '../lib/object-to-camel.js' import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' import { multipartRequest } from 'ipfs-core-utils/multipart-request' import { abortSignal } from '../lib/abort-signal.js' import { AbortController } from 'native-abort-controller' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { mapEvent } from './map-event.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -24,7 +23,7 @@ export const createPut = configure(api => { const res = await api.post('dht/put', { signal, searchParams: toUrlSearchParams({ - arg: uint8ArrayToString(key), + arg: key instanceof Uint8Array ? uint8ArrayToString(key) : key.toString(), ...options }), ...( @@ -32,15 +31,8 @@ export const createPut = configure(api => { ) }) - for await (let message of res.ndjson()) { - message = objectToCamel(message) - if (message.responses) { - message.responses = message.responses.map((/** @type {{ ID: string, Addrs: string[] }} */ { ID, Addrs }) => ({ - id: ID, - addrs: (Addrs || []).map(a => new Multiaddr(a)) - })) - } - yield message + for await (const event of res.ndjson()) { + yield mapEvent(event) } } diff --git a/packages/ipfs-http-client/src/dht/query.js b/packages/ipfs-http-client/src/dht/query.js index 0923f44bba..3f4465eae7 100644 --- a/packages/ipfs-http-client/src/dht/query.js +++ b/packages/ipfs-http-client/src/dht/query.js @@ -1,7 +1,6 @@ -import { Multiaddr } from 'multiaddr' -import { objectToCamel } from '../lib/object-to-camel.js' import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import { mapEvent } from './map-event.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -22,13 +21,8 @@ export const createQuery = configure(api => { headers: options.headers }) - for await (let message of res.ndjson()) { - message = objectToCamel(message) - message.responses = (message.responses || []).map((/** @type {{ ID: string, Addrs: string[] }} */ { ID, Addrs }) => ({ - id: ID, - addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a)) - })) - yield message + for await (const event of res.ndjson()) { + yield mapEvent(event) } } diff --git a/packages/ipfs-http-client/src/dht/response-types.js b/packages/ipfs-http-client/src/dht/response-types.js index 24250e4a14..a30e846cbd 100644 --- a/packages/ipfs-http-client/src/dht/response-types.js +++ b/packages/ipfs-http-client/src/dht/response-types.js @@ -1,6 +1,6 @@ // Response types are defined here = -// https =//github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L15-L24 +// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L15-L24 export const SendingQuery = 0 export const PeerResponse = 1 export const FinalPeer = 2 diff --git a/packages/ipfs-http-gateway/package.json b/packages/ipfs-http-gateway/package.json index 3d3dfeef87..dc1d7f6949 100644 --- a/packages/ipfs-http-gateway/package.json +++ b/packages/ipfs-http-gateway/package.json @@ -79,6 +79,6 @@ "aegir": "^36.0.1", "file-type": "^16.0.0", "rimraf": "^3.0.2", - "sinon": "^11.1.1" + "sinon": "^12.0.01" } } diff --git a/packages/ipfs-http-server/package.json b/packages/ipfs-http-server/package.json index 81d7dc4025..d875c43cae 100644 --- a/packages/ipfs-http-server/package.json +++ b/packages/ipfs-http-server/package.json @@ -50,6 +50,7 @@ "@hapi/hapi": "^20.0.0", "@ipld/dag-pb": "^2.1.3", "abort-controller": "^3.0.0", + "any-signal": "^2.1.2", "debug": "^4.1.1", "dlv": "^1.1.3", "err-code": "^3.0.1", @@ -76,6 +77,7 @@ "native-abort-controller": "^1.0.3", "parse-duration": "^1.0.0", "stream-to-it": "^0.2.2", + "timeout-abort-controller": "^2.0.0", "uint8arrays": "^3.0.0", "uri-to-multiaddr": "^6.0.0" }, @@ -85,15 +87,15 @@ "aegir": "^36.0.1", "form-data": "^4.0.0", "ipfs-http-client": "^54.0.2", - "iso-random-stream": "^2.0.0", + "iso-random-stream": "^2.0.2", "it-to-buffer": "^2.0.0", "npm-run-all": "^4.1.5", "qs": "^6.9.4", "rimraf": "^3.0.2", - "sinon": "^11.1.1", + "sinon": "^12.0.01", "stream-to-promise": "^3.0.0" }, "optionalDependencies": { - "prom-client": "^12.0.0" + "prom-client": "^14.0.1" } } diff --git a/packages/ipfs-http-server/src/api/resources/dht.js b/packages/ipfs-http-server/src/api/resources/dht.js index 3ed16e8c17..3f31a0e31a 100644 --- a/packages/ipfs-http-server/src/api/resources/dht.js +++ b/packages/ipfs-http-server/src/api/resources/dht.js @@ -1,8 +1,68 @@ import Joi from '../../utils/joi.js' -import Boom from '@hapi/boom' -import { pipe } from 'it-pipe' -import map from 'it-map' import { streamResponse } from '../../utils/stream-response.js' +import { TimeoutController } from 'timeout-abort-controller' +import { anySignal } from 'any-signal' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { multipartRequestParser } from '../../utils/multipart-request-parser.js' +import all from 'it-all' +import Boom from '@hapi/boom' + +/** + * @typedef {import('ipfs-core-types/src/dht').QueryEvent} QueryEvent + * @typedef {import('peer-id')} PeerId + */ + +/** + * @param {string} id + * @param {QueryEvent} event + */ +function mapQueryEvent (id, event) { + let extra = '' + const type = event.type + let responses = null + + if (event.name === 'SENDING_QUERY') { + id = event.to + } else if (event.name === 'PEER_RESPONSE') { + id = event.from + responses = event.closer.map(peerData => ({ + ID: peerData.id, + Addrs: peerData.multiaddrs + })) + } else if (event.name === 'QUERY_ERROR') { + id = event.from + extra = event.error.message + } else if (event.name === 'PROVIDER') { + id = event.from + responses = event.providers.map(peerData => ({ + ID: peerData.id, + Addrs: peerData.multiaddrs + })) + } else if (event.name === 'VALUE') { + id = event.from + extra = uint8ArrayToString(event.value, 'base64pad') + } else if (event.name === 'ADDING_PEER') { + responses = [{ + ID: event.peer, + Addrs: [] + }] + } else if (event.name === 'DIALING_PEER') { + id = event.peer + } else if (event.name === 'FINAL_PEER') { + id = event.peer.id + responses = [{ + ID: event.peer.id, + Addrs: event.peer.multiaddrs + }] + } + + return { + Extra: extra, + ID: id, + Type: type, + Responses: responses + } +} export const findPeerResource = { options: { @@ -42,27 +102,32 @@ export const findPeerResource = { } } = request - let res + const signals = [signal] + /** @type {TimeoutController | undefined} */ + let timeoutController - try { - res = await ipfs.dht.findPeer(peerId, { - signal, - timeout - }) - } catch (/** @type {any} */ err) { - if (err.code === 'ERR_LOOKUP_FAILED') { - throw Boom.notFound(err.toString()) - } else { - throw Boom.boomify(err, { message: err.toString() }) - } + if (timeout != null) { + timeoutController = new TimeoutController(timeout) + signals.push(timeoutController.signal) } - return h.response({ - Responses: [{ - ID: res.id.toString(), - Addrs: (res.addrs || []).map(a => a.toString()) - }], - Type: 2 + const id = await ipfs.id({ + signal, + timeout + }) + + return streamResponse(request, h, () => { + return (async function * () { + for await (const event of ipfs.dht.findPeer(peerId, { + signal: anySignal(signals) + })) { + yield mapQueryEvent(id.id, event) + } + + if (timeoutController) { + timeoutController.clear() + } + }()) }) } } @@ -94,7 +159,7 @@ export const findProvsResource = { * @param {import('../../types').Request} request * @param {import('@hapi/hapi').ResponseToolkit} h */ - handler (request, h) { + async handler (request, h) { const { app: { signal @@ -111,25 +176,44 @@ export const findProvsResource = { } } = request + const signals = [signal] + /** @type {TimeoutController | undefined} */ + let timeoutController + + if (timeout != null) { + timeoutController = new TimeoutController(timeout) + signals.push(timeoutController.signal) + } + + const id = await ipfs.id({ + signal, + timeout + }) + + const providers = new Set() + return streamResponse(request, h, () => { - return pipe( - ipfs.dht.findProvs(cid, { - numProviders, - signal, - timeout - }), - async function * (source) { - yield * map(source, ({ id, addrs }) => { - return { - Responses: [{ - ID: id.toString(), - Addrs: (addrs || []).map(a => a.toString()) - }], - Type: 4 - } - }) + return (async function * () { + for await (const event of ipfs.dht.findProvs(cid, { + signal: anySignal(signals) + })) { + if (event.name === 'PROVIDER') { + event.providers.forEach(peerData => { + providers.add(peerData.id) + }) + } + + yield mapQueryEvent(id.id, event) + + if (providers.size >= numProviders) { + break + } + } + + if (timeoutController) { + timeoutController.clear() } - ) + }()) }) } } @@ -142,10 +226,10 @@ export const getResource = { stripUnknown: true }, query: Joi.object().keys({ - buffer: Joi.binary().required(), + key: Joi.string().required(), timeout: Joi.timeout() }) - .rename('arg', 'buffer', { + .rename('arg', 'key', { override: true, ignoreUndefined: true }) @@ -167,19 +251,37 @@ export const getResource = { } }, query: { - buffer, + key, timeout } } = request - const res = await ipfs.dht.get(buffer, { + const signals = [signal] + /** @type {TimeoutController | undefined} */ + let timeoutController + + if (timeout != null) { + timeoutController = new TimeoutController(timeout) + signals.push(timeoutController.signal) + } + + const id = await ipfs.id({ signal, timeout }) - return h.response({ - Extra: res.toString(), - Type: 5 + return streamResponse(request, h, () => { + return (async function * () { + for await (const event of ipfs.dht.get(key, { + signal: anySignal(signals) + })) { + yield mapQueryEvent(id.id, event) + } + + if (timeoutController) { + timeoutController.clear() + } + }()) }) } } @@ -222,26 +324,87 @@ export const provideResource = { } } = request - await ipfs.dht.provide(cid, { + const signals = [signal] + /** @type {TimeoutController | undefined} */ + let timeoutController + + if (timeout != null) { + timeoutController = new TimeoutController(timeout) + signals.push(timeoutController.signal) + } + + const id = await ipfs.id({ signal, timeout }) - return h.response() + return streamResponse(request, h, () => { + return (async function * () { + for await (const event of ipfs.dht.provide(cid, { + signal: anySignal(signals) + })) { + yield mapQueryEvent(id.id, event) + } + + if (timeoutController) { + timeoutController.clear() + } + }()) + }) } } export const putResource = { options: { + payload: { + parse: false, + output: 'stream' + }, + pre: [{ + assign: 'args', + /** + * @param {import('../../types').Request} request + * @param {import('@hapi/hapi').ResponseToolkit} _h + */ + method: async (request, _h) => { + if (!request.payload) { + throw Boom.badRequest("Argument 'file' is required") + } + + let value + + for await (const part of multipartRequestParser(request.raw.req)) { + if (part.type !== 'file') { + continue + } + + value = Buffer.concat(await all(part.content)) + } + + if (!value) { + throw Boom.badRequest("Argument 'file' is required") + } + + try { + return { value } + } catch (/** @type {any} */ err) { + throw Boom.boomify(err, { message: 'Failed to decode file as config' }) + } + } + }], validate: { options: { allowUnknown: true, stripUnknown: true }, query: Joi.object().keys({ - arg: Joi.array().length(2).items(Joi.binary()).required(), + key: Joi.string().required(), timeout: Joi.timeout() }) + .rename('arg', 'key', { + override: true, + ignoreUndefined: true + }) } }, @@ -259,21 +422,44 @@ export const putResource = { ipfs } }, - query: { - arg: [ - key, + pre: { + args: { value - ], + } + }, + query: { + key, timeout } } = request - await ipfs.dht.put(key, value, { + const signals = [signal] + /** @type {TimeoutController | undefined} */ + let timeoutController + + if (timeout != null) { + timeoutController = new TimeoutController(timeout) + signals.push(timeoutController.signal) + } + + const id = await ipfs.id({ signal, timeout }) - return h.response() + return streamResponse(request, h, () => { + return (async function * () { + for await (const event of ipfs.dht.put(key, value, { + signal: anySignal(signals) + })) { + yield mapQueryEvent(id.id, event) + } + + if (timeoutController) { + timeoutController.clear() + } + }()) + }) } } @@ -285,10 +471,10 @@ export const queryResource = { stripUnknown: true }, query: Joi.object().keys({ - peerId: Joi.string().required(), + key: Joi.string().required(), timeout: Joi.timeout() }) - .rename('arg', 'peerId', { + .rename('arg', 'key', { override: true, ignoreUndefined: true }) @@ -299,7 +485,7 @@ export const queryResource = { * @param {import('../../types').Request} request * @param {import('@hapi/hapi').ResponseToolkit} h */ - handler (request, h) { + async handler (request, h) { const { app: { signal @@ -310,21 +496,37 @@ export const queryResource = { } }, query: { - peerId, + key, timeout } } = request + const signals = [signal] + /** @type {TimeoutController | undefined} */ + let timeoutController + + if (timeout != null) { + timeoutController = new TimeoutController(timeout) + signals.push(timeoutController.signal) + } + + const id = await ipfs.id({ + signal, + timeout + }) + return streamResponse(request, h, () => { - return pipe( - ipfs.dht.query(peerId, { - signal, - timeout - }), - async function * (source) { - yield * map(source, ({ id }) => ({ ID: id.toString() })) + return (async function * () { + for await (const event of ipfs.dht.query(key, { + signal: anySignal(signals) + })) { + yield mapQueryEvent(id.id, event) + } + + if (timeoutController) { + timeoutController.clear() } - ) + }()) }) } } diff --git a/packages/ipfs-http-server/src/index.js b/packages/ipfs-http-server/src/index.js index 3beccca8df..b168d091e5 100644 --- a/packages/ipfs-http-server/src/index.js +++ b/packages/ipfs-http-server/src/index.js @@ -242,6 +242,11 @@ export class HttpApi { const controller = new AbortController() request.app.signal = controller.signal + // abort the request if the client disconnects + request.raw.res.once('close', () => { + controller.abort() + }) + // abort the request if the client disconnects request.events.once('disconnect', () => { controller.abort() diff --git a/packages/ipfs-http-server/src/utils/stream-response.js b/packages/ipfs-http-server/src/utils/stream-response.js index 7b0ac7c94e..0ab77733cc 100644 --- a/packages/ipfs-http-server/src/utils/stream-response.js +++ b/packages/ipfs-http-server/src/utils/stream-response.js @@ -40,11 +40,6 @@ export async function streamResponse (request, h, getSource, options = {}) { if (options.onEnd) { options.onEnd() } - - if (!started) { // Maybe it was an empty source? - started = true - resolve(stream) - } } catch (/** @type {any} */ err) { log(err) @@ -61,7 +56,15 @@ export async function streamResponse (request, h, getSource, options = {}) { }) } - throw err + reject(err) + } finally { + if (!started) { // Maybe it was an empty source? + started = true + resolve(stream) + } + + // close the stream as we may have aborted execution during a yield + stream.end() } })(), toIterable.sink(stream) diff --git a/packages/ipfs-http-server/test/inject/dht.js b/packages/ipfs-http-server/test/inject/dht.js index 004d2a24e7..f65c9fd0d7 100644 --- a/packages/ipfs-http-server/test/inject/dht.js +++ b/packages/ipfs-http-server/test/inject/dht.js @@ -9,6 +9,9 @@ import errCode from 'err-code' import { CID } from 'multiformats/cid' import { AbortSignal } from 'native-abort-controller' import { allNdjson } from '../utils/all-ndjson.js' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import FormData from 'form-data' +import streamToPromise from 'stream-to-promise' describe('/dht', () => { const peerId = 'QmQ2zigjQikYnyYUSXZydNXrDRhBut2mubwJBaLXobMt3A' @@ -17,6 +20,7 @@ describe('/dht', () => { beforeEach(() => { ipfs = { + id: sinon.stub().resolves({ id: 'Qmfoo' }), dht: { findPeer: sinon.stub(), findProvs: sinon.stub(), @@ -30,8 +34,7 @@ describe('/dht', () => { describe('/findpeer', () => { const defaultOptions = { - signal: sinon.match.instanceOf(AbortSignal), - timeout: undefined + signal: sinon.match.instanceOf(AbortSignal) } it('only accepts POST', () => { @@ -48,7 +51,7 @@ describe('/dht', () => { expect(res).to.have.nested.property('result.Code', 1) }) - it('returns 404 if peerId is provided and there are no peers in the routing table', async () => { + it('returns 500 if peerId is provided and there are no peers in the routing table', async () => { ipfs.dht.findPeer.withArgs(peerId, defaultOptions).throws(errCode(new Error('Nope'), 'ERR_LOOKUP_FAILED')) const res = await http({ @@ -56,34 +59,15 @@ describe('/dht', () => { url: `/api/v0/dht/findpeer?arg=${peerId}` }, { ipfs }) - expect(res).to.have.property('statusCode', 404) + expect(res).to.have.property('statusCode', 500) expect(ipfs.dht.findPeer.called).to.be.true() expect(ipfs.dht.findPeer.getCall(0).args[0]).to.equal(peerId) }) - - it('accepts a timeout', async () => { - ipfs.dht.findPeer.withArgs(peerId, { - ...defaultOptions, - timeout: 1000 - }).returns({ - id: peerId, - addrs: [] - }) - - const res = await http({ - method: 'POST', - url: `/api/v0/dht/findpeer?arg=${peerId}&timeout=1s` - }, { ipfs }) - - expect(res).to.have.property('statusCode', 200) - }) }) describe('/findprovs', () => { const defaultOptions = { - numProviders: 20, - signal: sinon.match.instanceOf(AbortSignal), - timeout: undefined + signal: sinon.match.instanceOf(AbortSignal) } it('only accepts POST', async () => { @@ -107,10 +91,14 @@ describe('/dht', () => { it('returns 200 if key is provided', async () => { ipfs.dht.findProvs.withArgs(cid, defaultOptions).returns([{ - id: peerId, - addrs: [ - 'addr' - ] + name: 'PROVIDER', + type: 4, + providers: [{ + id: peerId, + multiaddrs: [ + 'addr' + ] + }] }]) const res = await http({ @@ -120,6 +108,7 @@ describe('/dht', () => { expect(res).to.have.property('statusCode', 200) expect(allNdjson(res)).to.deep.equal([{ + Extra: '', Type: 4, Responses: [{ ID: peerId, @@ -129,62 +118,52 @@ describe('/dht', () => { }) it('overrides num-providers', async () => { - ipfs.dht.findProvs.withArgs(cid, { - ...defaultOptions, - numProviders: 10 - }).returns([{ - id: peerId, - addrs: [ + const providers = new Array(20).fill(0).map((val, i) => ({ + id: peerId + i, + multiaddrs: [ 'addr' ] - }]) - - const res = await http({ - method: 'POST', - url: `/api/v0/dht/findprovs?arg=${cid}&num-providers=10` - }, { ipfs }) - - expect(res).to.have.property('statusCode', 200) - expect(allNdjson(res)).to.deep.equal([{ - Type: 4, - Responses: [{ - ID: peerId, - Addrs: ['addr'] - }] - }]) - }) + })) - it('accepts a timeout', async () => { ipfs.dht.findProvs.withArgs(cid, { - ...defaultOptions, - timeout: 1000 + ...defaultOptions }).returns([{ - id: peerId, - addrs: [ - 'addr' - ] + name: 'PROVIDER', + type: 4, + providers: providers.slice(0, 4) + }, { + name: 'PROVIDER', + type: 4, + providers: providers.slice(4, 8) + }, { + name: 'PROVIDER', + type: 4, + providers: providers.slice(8, 12) + }, { + name: 'PROVIDER', + type: 4, + providers: providers.slice(12) }]) const res = await http({ method: 'POST', - url: `/api/v0/dht/findprovs?arg=${cid}&timeout=1s` + url: `/api/v0/dht/findprovs?arg=${cid}&num-providers=10` }, { ipfs }) expect(res).to.have.property('statusCode', 200) - expect(allNdjson(res)).to.deep.equal([{ - Type: 4, - Responses: [{ - ID: peerId, - Addrs: ['addr'] - }] - }]) + + const provs = allNdjson(res).map(event => event.Responses).reduce((acc, curr) => { + return acc.concat(...curr) + }, []) + + // should ignore subsequent providers after reaching limit + expect(provs).to.have.lengthOf(12) }) }) describe('/get', () => { const defaultOptions = { - signal: sinon.match.instanceOf(AbortSignal), - timeout: undefined + signal: sinon.match.instanceOf(AbortSignal) } it('only accepts POST', async () => { @@ -208,8 +187,12 @@ describe('/dht', () => { it('returns 200 if key is provided', async () => { const key = 'key' - const value = 'value' - ipfs.dht.get.withArgs(Buffer.from(key), defaultOptions).returns(value) + const value = Buffer.from('hello world') + ipfs.dht.get.withArgs(key, defaultOptions).returns([{ + type: 5, + name: 'VALUE', + value: value + }]) const res = await http({ method: 'POST', @@ -217,33 +200,17 @@ describe('/dht', () => { }, { ipfs }) expect(res).to.have.property('statusCode', 200) - expect(res).to.have.nested.property('result.Type', 5) - expect(res).to.have.nested.property('result.Extra', value) - }) - - it('accepts a timeout', async () => { - const key = 'key' - const value = 'value' - ipfs.dht.get.withArgs(Buffer.from(key), { - ...defaultOptions, - timeout: 1000 - }).returns(value) - - const res = await http({ - method: 'POST', - url: `/api/v0/dht/get?arg=${key}&timeout=1s` - }, { ipfs }) - - expect(res).to.have.property('statusCode', 200) - expect(res).to.have.nested.property('result.Type', 5) - expect(res).to.have.nested.property('result.Extra', value) + expect(allNdjson(res)).to.deep.equal([{ + Extra: uint8ArrayToString(value, 'base64pad'), + Type: 5, + Responses: null + }]) }) }) describe('/provide', () => { const defaultOptions = { - signal: sinon.match.instanceOf(AbortSignal), - timeout: undefined + signal: sinon.match.instanceOf(AbortSignal) } it('only accepts POST', async () => { @@ -276,7 +243,7 @@ describe('/dht', () => { }) it('returns 500 if key is provided as the file was not added', async () => { - ipfs.dht.provide.withArgs(cid).throws(new Error('wut')) + ipfs.dht.provide.withArgs(cid, defaultOptions).throws(new Error('wut')) const res = await http({ method: 'POST', @@ -287,33 +254,50 @@ describe('/dht', () => { }) it('returns 200 if key is provided', async () => { - const res = await http({ - method: 'POST', - url: `/api/v0/dht/provide?arg=${cid}` - }, { ipfs }) - - expect(res).to.have.property('statusCode', 200) // needs file add - expect(ipfs.dht.provide.calledWith(cid, defaultOptions)).to.be.true() - }) + ipfs.dht.provide.withArgs(cid, defaultOptions).returns([{ + name: 'DIALING_PEER', + type: 7, + peer: peerId + }, { + name: 'SENDING_QUERY', + type: 0, + to: peerId + }, { + name: 'PEER_RESPONSE', + type: 1, + from: peerId, + closer: [], + providers: [] + }]) - it('accepts a timeout', async () => { const res = await http({ method: 'POST', - url: `/api/v0/dht/provide?arg=${cid}&timeout=1s` + url: `/api/v0/dht/provide?arg=${cid}` }, { ipfs }) expect(res).to.have.property('statusCode', 200) // needs file add - expect(ipfs.dht.provide.calledWith(cid, { - ...defaultOptions, - timeout: 1000 - })).to.be.true() + expect(allNdjson(res)).to.deep.equal([{ + Extra: '', + ID: peerId, + Type: 7, + Responses: null + }, { + Extra: '', + ID: peerId, + Type: 0, + Responses: null + }, { + Extra: '', + ID: peerId, + Type: 1, + Responses: [] + }]) }) }) describe('/put', () => { const defaultOptions = { - signal: sinon.match.instanceOf(AbortSignal), - timeout: undefined + signal: sinon.match.instanceOf(AbortSignal) } it('only accepts POST', async () => { @@ -337,38 +321,60 @@ describe('/dht', () => { it('returns 200 if key and value is provided', async function () { const key = 'key' - const value = 'value' + const value = Buffer.from('value') + + ipfs.dht.put.withArgs(key, value, defaultOptions).returns([{ + name: 'DIALING_PEER', + type: 7, + peer: peerId + }, { + name: 'SENDING_QUERY', + type: 0, + to: peerId + }, { + name: 'PEER_RESPONSE', + type: 1, + from: peerId, + closer: [], + providers: [] + }]) - const res = await http({ - method: 'POST', - url: `/api/v0/dht/put?arg=${key}&arg=${value}` - }, { ipfs }) + const form = new FormData() + form.append('data', value) + const headers = form.getHeaders() - expect(res).to.have.property('statusCode', 200) - expect(ipfs.dht.put.calledWith(Buffer.from(key), Buffer.from(value), defaultOptions)).to.be.true() - }) - - it('accepts a timeout', async function () { - const key = 'key' - const value = 'value' + const payload = await streamToPromise(form) const res = await http({ method: 'POST', - url: `/api/v0/dht/put?arg=${key}&arg=${value}&timeout=1s` + url: `/api/v0/dht/put?arg=${key}`, + headers, + payload }, { ipfs }) expect(res).to.have.property('statusCode', 200) - expect(ipfs.dht.put.calledWith(Buffer.from(key), Buffer.from(value), { - ...defaultOptions, - timeout: 1000 - })).to.be.true() + expect(allNdjson(res)).to.deep.equal([{ + Extra: '', + ID: peerId, + Type: 7, + Responses: null + }, { + Extra: '', + ID: peerId, + Type: 0, + Responses: null + }, { + Extra: '', + ID: peerId, + Type: 1, + Responses: [] + }]) }) }) describe('/query', () => { const defaultOptions = { - signal: sinon.match.instanceOf(AbortSignal), - timeout: undefined + signal: sinon.match.instanceOf(AbortSignal) } it('only accepts POST', async () => { @@ -392,7 +398,19 @@ describe('/dht', () => { it('returns 200 if key is provided', async function () { ipfs.dht.query.withArgs(peerId, defaultOptions).returns([{ - id: 'id' + name: 'DIALING_PEER', + type: 7, + peer: peerId + }, { + name: 'SENDING_QUERY', + type: 0, + to: peerId + }, { + name: 'PEER_RESPONSE', + type: 1, + from: peerId, + closer: [], + providers: [] }]) const res = await http({ @@ -401,24 +419,22 @@ describe('/dht', () => { }, { ipfs }) expect(res).to.have.property('statusCode', 200) - expect(JSON.parse(res.result)).to.have.property('ID', 'id') - }) - - it('accepts a timeout', async function () { - ipfs.dht.query.withArgs(peerId, { - ...defaultOptions, - timeout: 1000 - }).returns([{ - id: 'id' + expect(allNdjson(res)).to.deep.equal([{ + Extra: '', + ID: peerId, + Type: 7, + Responses: null + }, { + Extra: '', + ID: peerId, + Type: 0, + Responses: null + }, { + Extra: '', + ID: peerId, + Type: 1, + Responses: [] }]) - - const res = await http({ - method: 'POST', - url: `/api/v0/dht/query?arg=${peerId}&timeout=1s` - }, { ipfs }) - - expect(res).to.have.property('statusCode', 200) - expect(JSON.parse(res.result)).to.have.property('ID', 'id') }) }) }) diff --git a/packages/ipfs/package.json b/packages/ipfs/package.json index 650b17b94f..e13a42627f 100644 --- a/packages/ipfs/package.json +++ b/packages/ipfs/package.json @@ -64,7 +64,7 @@ "test:interface:client": "aegir test -f test/interface-client.js", "test:interface:http-js": "aegir test -f test/interface-http-js.js", "test:interface:http-go": "aegir test -f test/interface-http-go.js", - "test:interop": "cross-env IPFS_JS_EXEC=$PWD/src/cli.js IPFS_JS_MODULE=$PWD/dist IPFS_JS_HTTP_MODULE=$PWD/node_modules/ipfs-http-client IPFS_REUSEPORT=false ipfs-interop", + "test:interop": "cross-env DEBUG=$DEBUG IPFS_LOGGING=$IPFS_LOGGING IPFS_JS_EXEC=$PWD/src/cli.js IPFS_JS_MODULE=$PWD/dist IPFS_JS_HTTP_MODULE=$PWD/node_modules/ipfs-http-client IPFS_REUSEPORT=false ipfs-interop", "test:external": "aegir test-dependant", "clean": "rimraf ./dist", "dep-check": "aegir dep-check -i ipfs-core-types -i @types/* -i npm-run-all -i copyfiles" @@ -89,11 +89,11 @@ "ipfs-client": "^0.7.4", "ipfs-core-types": "^0.8.4", "ipfs-http-client": "^54.0.2", - "ipfs-interop": "^7.0.2", + "ipfs-interop": "ipfs/interop#feat/add-dht-interop-tests", "ipfs-utils": "^9.0.2", "ipfsd-ctl": "^10.0.4", "iso-url": "^1.0.0", - "libp2p-webrtc-star-signalling-server": "^0.1.0", + "libp2p-webrtc-star-signalling-server": "^0.1.1", "merge-options": "^3.0.4", "mock-ipfs-pinning-service": "^0.1.2", "npm-run-all": "^4.1.5", diff --git a/packages/ipfs/src/cli.js b/packages/ipfs/src/cli.js index 384b727159..b63e7ff2cd 100755 --- a/packages/ipfs/src/cli.js +++ b/packages/ipfs/src/cli.js @@ -41,6 +41,12 @@ const onUnhandledRejection = (err) => { process.once('uncaughtException', onUncaughtException) process.once('unhandledRejection', onUnhandledRejection) +if (process.env.DEBUG) { + process.on('warning', err => { + console.error(err.stack) + }) +} + const log = debug('ipfs:cli') process.title = pkg.name diff --git a/packages/ipfs/test/interface-core.js b/packages/ipfs/test/interface-core.js index cd6cbbd18f..532062a714 100644 --- a/packages/ipfs/test/interface-core.js +++ b/packages/ipfs/test/interface-core.js @@ -31,11 +31,7 @@ describe('interface-ipfs-core tests', function () { tests.dag(commonFactory) - tests.dht(commonFactory, { - skip: { - reason: 'TODO: unskip when DHT is enabled: https://github.com/ipfs/js-ipfs/pull/1994' - } - }) + tests.dht(commonFactory) tests.files(factory(), { skip: isNode diff --git a/packages/ipfs/test/interface-http-js.js b/packages/ipfs/test/interface-http-js.js index d485e8c3c7..1862667764 100644 --- a/packages/ipfs/test/interface-http-js.js +++ b/packages/ipfs/test/interface-http-js.js @@ -65,11 +65,7 @@ describe('interface-ipfs-core over ipfs-http-client tests against js-ipfs', func }] }) - tests.dht(commonFactory, { - skip: { - reason: 'TODO: unskip when DHT is enabled: https://github.com/ipfs/js-ipfs/pull/1994' - } - }) + tests.dht(commonFactory) tests.files(commonFactory, { skip: (isBrowser || isWebWorker