From 3764d060563a4fb4972b0dfbefdbd8c2a72e5cf5 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 28 Aug 2019 11:04:38 +0100 Subject: [PATCH] feat: browser pubsub (#1059) This PR enabled pubsub in the browser and paves the way for a switch to using `fetch` by default and allowing for cancelable requests via the use of `AbortController`. It's mostly the work done in https://github.com/ipfs-shipyard/js-ipfs-http-client-lite/pull/1 but adapted a bit for use here. If approved, we can start work moving the other commands to use `fetch`. The work in https://github.com/ipfs-shipyard/js-ipfs-http-client-lite has proven the hard parts (uploading files) are all possible using the `fetch` API. Since `fetch` is promise based, when moving the other commands it makes sense to just switch to async/await as per https://github.com/ipfs/js-ipfs/issues/1670 (and callbackify instead of promisify). Depends on: * [x] https://github.com/ipfs/interface-js-ipfs-core/pull/505 resolves https://github.com/ipfs/js-ipfs-http-client/issues/518 refs https://github.com/ipfs/js-ipfs/issues/2093 resolves https://github.com/ipfs/js-ipfs-http-client/issues/932 License: MIT Signed-off-by: Alan Shaw --- package.json | 8 +- src/lib/configure.js | 59 ++++++++ src/lib/error-handler.js | 31 ++++ src/lib/multiaddr.js | 18 +++ src/lib/stream-to-iterable.js | 25 ++++ src/pubsub.js | 212 ---------------------------- src/pubsub/index.js | 50 +++++++ src/pubsub/ls.js | 18 +++ src/pubsub/peers.js | 26 ++++ src/pubsub/publish.js | 45 ++++++ src/pubsub/subscribe.js | 85 +++++++++++ src/pubsub/subscription-tracker.js | 52 +++++++ src/pubsub/unsubscribe.js | 10 ++ src/utils/pubsub-message-stream.js | 34 ----- src/utils/pubsub-message-utils.js | 39 ----- src/utils/stringlist-to-array.js | 9 -- test/interface.spec.js | 18 ++- test/lib.configure.spec.js | 77 ++++++++++ test/lib.error-handler.spec.js | 54 +++++++ test/lib.stream-to-iterable.spec.js | 43 ++++++ test/pubsub-in-browser.spec.js | 162 --------------------- test/utils/throws-async.js | 10 ++ 22 files changed, 618 insertions(+), 467 deletions(-) create mode 100644 src/lib/configure.js create mode 100644 src/lib/error-handler.js create mode 100644 src/lib/multiaddr.js create mode 100644 src/lib/stream-to-iterable.js delete mode 100644 src/pubsub.js create mode 100644 src/pubsub/index.js create mode 100644 src/pubsub/ls.js create mode 100644 src/pubsub/peers.js create mode 100644 src/pubsub/publish.js create mode 100644 src/pubsub/subscribe.js create mode 100644 src/pubsub/subscription-tracker.js create mode 100644 src/pubsub/unsubscribe.js delete mode 100644 src/utils/pubsub-message-stream.js delete mode 100644 src/utils/pubsub-message-utils.js delete mode 100644 src/utils/stringlist-to-array.js create mode 100644 test/lib.configure.spec.js create mode 100644 test/lib.error-handler.spec.js create mode 100644 test/lib.stream-to-iterable.spec.js delete mode 100644 test/pubsub-in-browser.spec.js create mode 100644 test/utils/throws-async.js diff --git a/package.json b/package.json index 33cd7bde1..16dc4850b 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "coverage": "npx nyc -r html npm run test:node -- --bail" }, "dependencies": { + "abort-controller": "^3.0.0", "async": "^2.6.1", "bignumber.js": "^9.0.0", "bl": "^3.0.0", @@ -44,6 +45,7 @@ "detect-node": "^2.0.4", "end-of-stream": "^1.4.1", "err-code": "^1.1.2", + "explain-error": "^1.0.4", "flatmap": "0.0.3", "glob": "^7.1.3", "ipfs-block": "~0.8.1", @@ -56,9 +58,12 @@ "is-stream": "^2.0.0", "iso-stream-http": "~0.1.2", "iso-url": "~0.4.6", + "iterable-ndjson": "^1.1.0", "just-kebab-case": "^1.1.0", "just-map-keys": "^1.1.0", "kind-of": "^6.0.2", + "ky": "^0.11.2", + "ky-universal": "^0.2.2", "lru-cache": "^5.1.1", "multiaddr": "^6.0.6", "multibase": "~0.6.0", @@ -68,6 +73,7 @@ "once": "^1.4.0", "peer-id": "~0.12.3", "peer-info": "~0.15.1", + "promise-nodeify": "^3.0.1", "promisify-es6": "^1.0.3", "pull-defer": "~0.2.3", "pull-stream": "^3.6.9", @@ -86,7 +92,7 @@ "cross-env": "^5.2.0", "dirty-chai": "^2.0.1", "go-ipfs-dep": "0.4.21", - "interface-ipfs-core": "^0.109.0", + "interface-ipfs-core": "^0.111.0", "ipfsd-ctl": "~0.43.0", "nock": "^10.0.2", "stream-equal": "^1.1.1" diff --git a/src/lib/configure.js b/src/lib/configure.js new file mode 100644 index 000000000..a9036d1cd --- /dev/null +++ b/src/lib/configure.js @@ -0,0 +1,59 @@ +'use strict' +/* eslint-env browser */ + +const ky = require('ky-universal').default +const { isBrowser, isWebWorker } = require('ipfs-utils/src/env') +const { toUri } = require('./multiaddr') +const errorHandler = require('./error-handler') + +// Set default configuration and call create function with them +module.exports = create => config => { + config = config || {} + + if (typeof config === 'string') { + config = { apiAddr: config } + } else if (config.constructor && config.constructor.isMultiaddr) { + config = { apiAddr: config } + } else { + config = { ...config } + } + + config.apiAddr = (config.apiAddr || getDefaultApiAddr(config)).toString() + config.apiAddr = config.apiAddr.startsWith('/') ? toUri(config.apiAddr) : config.apiAddr + config.apiPath = config.apiPath || config['api-path'] || '/api/v0' + + return create({ + // TODO configure ky to use config.fetch when this is released: + // https://github.com/sindresorhus/ky/pull/153 + ky: ky.extend({ + prefixUrl: config.apiAddr + config.apiPath, + timeout: config.timeout || 60 * 1000, + headers: config.headers, + hooks: { + afterResponse: [errorHandler] + } + }), + ...config + }) +} + +function getDefaultApiAddr ({ protocol, host, port }) { + if (isBrowser || isWebWorker) { + if (!protocol && !host && !port) { // Use current origin + return '' + } + + if (!protocol) { + protocol = location.protocol.startsWith('http') + ? location.protocol.split(':')[0] + : 'http' + } + + host = host || location.hostname + port = port || location.port + + return `${protocol}://${host}${port ? ':' + port : ''}` + } + + return `${protocol || 'http'}://${host || 'localhost'}:${port || 5001}` +} diff --git a/src/lib/error-handler.js b/src/lib/error-handler.js new file mode 100644 index 000000000..1e788227c --- /dev/null +++ b/src/lib/error-handler.js @@ -0,0 +1,31 @@ +'use strict' + +const { HTTPError } = require('ky-universal') +const log = require('debug')('ipfs-http-client:lib:error-handler') + +function isJsonResponse (res) { + return (res.headers.get('Content-Type') || '').startsWith('application/json') +} + +module.exports = async function errorHandler (response) { + if (response.ok) return + + let msg + + try { + if (isJsonResponse(response)) { + const data = await response.json() + log(data) + msg = data.Message || data.message + } else { + msg = await response.text() + } + } catch (err) { + log('Failed to parse error response', err) + // Failed to extract/parse error message from response + throw new HTTPError(response) + } + + if (!msg) throw new HTTPError(response) + throw Object.assign(new Error(msg), { status: response.status }) +} diff --git a/src/lib/multiaddr.js b/src/lib/multiaddr.js new file mode 100644 index 000000000..09462ab34 --- /dev/null +++ b/src/lib/multiaddr.js @@ -0,0 +1,18 @@ +'use strict' + +// Convert a multiaddr to a URI +// Assumes multiaddr is in a format that can be converted to a HTTP(s) URI +exports.toUri = ma => { + const parts = `${ma}`.split('/') + const port = getPort(parts) + return `${getProtocol(parts)}://${parts[2]}${port == null ? '' : ':' + port}` +} + +function getProtocol (maParts) { + return maParts.indexOf('https') === -1 ? 'http' : 'https' +} + +function getPort (maParts) { + const tcpIndex = maParts.indexOf('tcp') + return tcpIndex === -1 ? null : maParts[tcpIndex + 1] +} diff --git a/src/lib/stream-to-iterable.js b/src/lib/stream-to-iterable.js new file mode 100644 index 000000000..5e06a99c6 --- /dev/null +++ b/src/lib/stream-to-iterable.js @@ -0,0 +1,25 @@ +'use strict' + +module.exports = function toIterable (body) { + // Node.js stream + if (body[Symbol.asyncIterator]) return body + + // Browser ReadableStream + if (body.getReader) { + return (async function * () { + const reader = body.getReader() + + try { + while (true) { + const { done, value } = await reader.read() + if (done) return + yield value + } + } finally { + reader.releaseLock() + } + })() + } + + throw new Error('unknown stream') +} diff --git a/src/pubsub.js b/src/pubsub.js deleted file mode 100644 index 6b298351d..000000000 --- a/src/pubsub.js +++ /dev/null @@ -1,212 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const EventEmitter = require('events') -const eos = require('end-of-stream') -const isNode = require('detect-node') -const setImmediate = require('async/setImmediate') -const PubsubMessageStream = require('./utils/pubsub-message-stream') -const stringlistToArray = require('./utils/stringlist-to-array') -const moduleConfig = require('./utils/module-config') - -const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser') - -/* Public API */ -module.exports = (arg) => { - const send = moduleConfig(arg) - - /* Internal subscriptions state and functions */ - const ps = new EventEmitter() - const subscriptions = {} - ps.id = Math.random() - return { - subscribe: (topic, handler, options, callback) => { - const defaultOptions = { - discover: false - } - - if (typeof options === 'function') { - callback = options - options = defaultOptions - } - - if (!options) { - options = defaultOptions - } - - // Throw an error if ran in the browsers - if (!isNode) { - if (!callback) { - return Promise.reject(NotSupportedError()) - } - - return setImmediate(() => callback(NotSupportedError())) - } - - // promisify doesn't work as we always pass a - // function as last argument (`handler`) - if (!callback) { - return new Promise((resolve, reject) => { - subscribe(topic, handler, options, (err) => { - if (err) { - return reject(err) - } - resolve() - }) - }) - } - - subscribe(topic, handler, options, callback) - }, - unsubscribe: (topic, handler, callback) => { - if (!isNode) { - if (!callback) { - return Promise.reject(NotSupportedError()) - } - - return setImmediate(() => callback(NotSupportedError())) - } - - if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) { - const err = new Error(`Not subscribed to '${topic}'`) - - if (!callback) { - return Promise.reject(err) - } - - return setImmediate(() => callback(err)) - } - - if (!handler && !callback) { - ps.removeAllListeners(topic) - } else { - ps.removeListener(topic, handler) - } - - // Drop the request once we are actually done - if (ps.listenerCount(topic) === 0) { - if (!callback) { - return new Promise((resolve, reject) => { - // When the response stream has ended, resolve the promise - eos(subscriptions[topic].res, (err) => { - // FIXME: Artificial timeout needed to ensure unsubscribed - setTimeout(() => { - if (err) return reject(err) - resolve() - }) - }) - subscriptions[topic].req.abort() - subscriptions[topic] = null - }) - } - - // When the response stream has ended, call the callback - eos(subscriptions[topic].res, (err) => { - // FIXME: Artificial timeout needed to ensure unsubscribed - setTimeout(() => callback(err)) - }) - subscriptions[topic].req.abort() - subscriptions[topic] = null - return - } - - if (!callback) { - return Promise.resolve() - } - - setImmediate(() => callback()) - }, - publish: promisify((topic, data, callback) => { - if (!isNode) { - return callback(NotSupportedError()) - } - - if (!Buffer.isBuffer(data)) { - return callback(new Error('data must be a Buffer')) - } - - const request = { - path: 'pubsub/pub', - args: [topic, data] - } - - send(request, callback) - }), - ls: promisify((callback) => { - if (!isNode) { - return callback(NotSupportedError()) - } - - const request = { - path: 'pubsub/ls' - } - - send.andTransform(request, stringlistToArray, callback) - }), - peers: promisify((topic, callback) => { - if (!isNode) { - return callback(NotSupportedError()) - } - - const request = { - path: 'pubsub/peers', - args: [topic] - } - - send.andTransform(request, stringlistToArray, callback) - }), - setMaxListeners (n) { - return ps.setMaxListeners(n) - } - } - - function subscribe (topic, handler, options, callback) { - ps.on(topic, handler) - - if (subscriptions[topic]) { - // TODO: should a callback error be returned? - return callback() - } - - // Request params - const request = { - path: 'pubsub/sub', - args: [topic], - qs: { - discover: options.discover - } - } - - // Start the request and transform the response - // stream to Pubsub messages stream - subscriptions[topic] = {} - subscriptions[topic].req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { - if (err) { - subscriptions[topic] = null - ps.removeListener(topic, handler) - return callback(err) - } - - subscriptions[topic].res = stream - - stream.on('data', (msg) => { - ps.emit(topic, msg) - }) - - stream.on('error', (err) => { - ps.emit('error', err) - }) - - eos(stream, (err) => { - if (err) { - ps.emit('error', err) - } - - subscriptions[topic] = null - ps.removeListener(topic, handler) - }) - - callback() - }) - } -} diff --git a/src/pubsub/index.js b/src/pubsub/index.js new file mode 100644 index 000000000..2738bd5ac --- /dev/null +++ b/src/pubsub/index.js @@ -0,0 +1,50 @@ +'use strict' + +const nodeify = require('promise-nodeify') + +// This file is temporary and for compatibility with legacy usage +module.exports = (send, options) => { + if (typeof send !== 'function') { + options = send + } + + const ls = require('./ls')(options) + const peers = require('./peers')(options) + const publish = require('./publish')(options) + const subscribe = require('./subscribe')(options) + const unsubscribe = require('./unsubscribe')(options) + + return { + ls: (options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(ls(options), callback) + }, + peers: (topic, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(peers(topic, options), callback) + }, + publish: (topic, data, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(publish(topic, data, options), callback) + }, + subscribe: (topic, handler, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(subscribe(topic, handler, options), callback) + }, + unsubscribe: (topic, handler, callback) => { + return nodeify(unsubscribe(topic, handler), callback) + } + } +} diff --git a/src/pubsub/ls.js b/src/pubsub/ls.js new file mode 100644 index 000000000..177dcd491 --- /dev/null +++ b/src/pubsub/ls.js @@ -0,0 +1,18 @@ +'use strict' + +const configure = require('../lib/configure') + +module.exports = configure(({ ky }) => { + return async (options) => { + options = options || {} + + const { Strings } = await ky.get('pubsub/ls', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams: options.searchParams + }).json() + + return Strings || [] + } +}) diff --git a/src/pubsub/peers.js b/src/pubsub/peers.js new file mode 100644 index 000000000..bdeca60e4 --- /dev/null +++ b/src/pubsub/peers.js @@ -0,0 +1,26 @@ +'use strict' + +const configure = require('../lib/configure') + +module.exports = configure(({ ky }) => { + return async (topic, options) => { + if (!options && typeof topic === 'object') { + options = topic + topic = null + } + + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', topic) + + const { Strings } = await ky.get('pubsub/peers', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }).json() + + return Strings || [] + } +}) diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js new file mode 100644 index 000000000..a41c8fba0 --- /dev/null +++ b/src/pubsub/publish.js @@ -0,0 +1,45 @@ +'use strict' + +const { Buffer } = require('buffer') +const configure = require('../lib/configure') + +module.exports = configure(({ ky }) => { + return async (topic, data, options) => { + options = options || {} + data = Buffer.from(data) + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', topic) + + const res = await ky.post(`pubsub/pub?${searchParams}&arg=${encodeBuffer(data)}`, { + timeout: options.timeout, + signal: options.signal, + headers: options.headers + }).text() + + return res + } +}) + +function encodeBuffer (buf) { + let uriEncoded = '' + for (const byte of buf) { + // https://tools.ietf.org/html/rfc3986#page-14 + // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E), + // underscore (%5F), or tilde (%7E) + if ( + (byte >= 0x41 && byte <= 0x5A) || + (byte >= 0x61 && byte <= 0x7A) || + (byte >= 0x30 && byte <= 0x39) || + (byte === 0x2D) || + (byte === 0x2E) || + (byte === 0x5F) || + (byte === 0x7E) + ) { + uriEncoded += String.fromCharCode(byte) + } else { + uriEncoded += `%${byte.toString(16).padStart(2, '0')}` + } + } + return uriEncoded +} diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js new file mode 100644 index 000000000..ae95ec5c8 --- /dev/null +++ b/src/pubsub/subscribe.js @@ -0,0 +1,85 @@ +'use strict' + +const ndjson = require('iterable-ndjson') +const explain = require('explain-error') +const bs58 = require('bs58') +const { Buffer } = require('buffer') +const log = require('debug')('ipfs-http-client:pubsub:subscribe') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') +const SubscriptionTracker = require('./subscription-tracker') + +module.exports = configure(({ ky }) => { + const subsTracker = SubscriptionTracker.singleton() + const publish = require('./publish')({ ky }) + + return async (topic, handler, options) => { + options = options || {} + options.signal = subsTracker.subscribe(topic, handler, options.signal) + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', topic) + if (options.discover != null) searchParams.set('discover', options.discover) + + let res + + // In Firefox, the initial call to fetch does not resolve until some data + // is received. If this doesn't happen within 1 second send an empty message + // to kickstart the process. + const ffWorkaround = setTimeout(async () => { + log(`Publishing empty message to "${topic}" to resolve subscription request`) + try { + await publish(topic, Buffer.alloc(0), options) + } catch (err) { + log('Failed to publish empty message', err) + } + }, 1000) + + try { + res = await ky.post('pubsub/sub', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) + } catch (err) { // Initial subscribe fail, ensure we clean up + subsTracker.unsubscribe(topic, handler) + throw err + } + + clearTimeout(ffWorkaround) + + readMessages(ndjson(toIterable(res.body)), { + onMessage: handler, + onEnd: () => subsTracker.unsubscribe(topic, handler), + onError: options.onError + }) + } +}) + +async function readMessages (msgStream, { onMessage, onEnd, onError }) { + onError = onError || log + + try { + for await (const msg of msgStream) { + try { + onMessage({ + from: bs58.encode(Buffer.from(msg.from, 'base64')).toString(), + data: Buffer.from(msg.data, 'base64'), + seqno: Buffer.from(msg.seqno, 'base64'), + topicIDs: msg.topicIDs + }) + } catch (err) { + onError(explain(err, 'Failed to parse pubsub message'), false, msg) // Not fatal + } + } + } catch (err) { + // FIXME: In testing with Chrome, err.type is undefined (should not be!) + // Temporarily use the name property instead. + if (err.type !== 'aborted' && err.name !== 'AbortError') { + onError(err, true) // Fatal + } + } finally { + onEnd() + } +} diff --git a/src/pubsub/subscription-tracker.js b/src/pubsub/subscription-tracker.js new file mode 100644 index 000000000..bbd7c2d7a --- /dev/null +++ b/src/pubsub/subscription-tracker.js @@ -0,0 +1,52 @@ +'use strict' + +const AbortController = require('abort-controller') + +class SubscriptionTracker { + constructor () { + this._subs = new Map() + } + + static singleton () { + if (SubscriptionTracker.instance) return SubscriptionTracker.instance + SubscriptionTracker.instance = new SubscriptionTracker() + return SubscriptionTracker.instance + } + + subscribe (topic, handler, signal) { + const topicSubs = this._subs.get(topic) || [] + + if (topicSubs.find(s => s.handler === handler)) { + throw new Error(`Already subscribed to ${topic} with this handler`) + } + + // Create controller so a call to unsubscribe can cancel the request + const controller = new AbortController() + + this._subs.set(topic, [{ handler, controller }].concat(topicSubs)) + + // If there is an external signal, forward the abort event + if (signal) { + signal.addEventListener('abort', () => this.unsubscribe(topic, handler)) + } + + return controller.signal + } + + unsubscribe (topic, handler) { + const subs = this._subs.get(topic) || [] + let unsubs + + if (handler) { + this._subs.set(topic, subs.filter(s => s.handler !== handler)) + unsubs = subs.filter(s => s.handler === handler) + } else { + this._subs.set(topic, []) + unsubs = subs + } + + unsubs.forEach(s => s.controller.abort()) + } +} + +module.exports = SubscriptionTracker diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js new file mode 100644 index 000000000..6e7c727f4 --- /dev/null +++ b/src/pubsub/unsubscribe.js @@ -0,0 +1,10 @@ +'use strict' + +const configure = require('../lib/configure') +const SubscriptionTracker = require('./subscription-tracker') + +module.exports = configure(({ ky }) => { + const subsTracker = SubscriptionTracker.singleton() + // eslint-disable-next-line require-await + return async (topic, handler) => subsTracker.unsubscribe(topic, handler) +}) diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js deleted file mode 100644 index d5925f714..000000000 --- a/src/utils/pubsub-message-stream.js +++ /dev/null @@ -1,34 +0,0 @@ -'use strict' - -const TransformStream = require('readable-stream').Transform -const PubsubMessage = require('./pubsub-message-utils') - -class PubsubMessageStream extends TransformStream { - constructor (options) { - const opts = Object.assign(options || {}, { objectMode: true }) - super(opts) - } - - static from (inputStream, callback) { - const outputStream = inputStream.pipe(new PubsubMessageStream()) - inputStream.on('end', () => outputStream.emit('end')) - callback(null, outputStream) - } - - _transform (obj, enc, callback) { - // go-ipfs returns '{}' as the very first object atm, we skip that - if (Object.keys(obj).length === 0) { - return callback() - } - - try { - const msg = PubsubMessage.deserialize(obj, 'base64') - this.push(msg) - callback() - } catch (err) { - return callback(err) - } - } -} - -module.exports = PubsubMessageStream diff --git a/src/utils/pubsub-message-utils.js b/src/utils/pubsub-message-utils.js deleted file mode 100644 index 53d1e397a..000000000 --- a/src/utils/pubsub-message-utils.js +++ /dev/null @@ -1,39 +0,0 @@ -'use strict' - -const bs58 = require('bs58') - -module.exports = { - deserialize (data, enc) { - enc = enc ? enc.toLowerCase() : 'json' - - if (enc === 'json') { - return deserializeFromJson(data) - } else if (enc === 'base64') { - return deserializeFromBase64(data) - } - - throw new Error(`Unsupported encoding: '${enc}'`) - } -} - -function deserializeFromJson (data) { - const json = JSON.parse(data) - return deserializeFromBase64(json) -} - -function deserializeFromBase64 (obj) { - if (!isPubsubMessage(obj)) { - throw new Error(`Not a pubsub message`) - } - - return { - from: bs58.encode(Buffer.from(obj.from, 'base64')).toString(), - seqno: Buffer.from(obj.seqno, 'base64'), - data: Buffer.from(obj.data, 'base64'), - topicIDs: obj.topicIDs || obj.topicCIDs - } -} - -function isPubsubMessage (obj) { - return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) -} diff --git a/src/utils/stringlist-to-array.js b/src/utils/stringlist-to-array.js deleted file mode 100644 index df28ee6df..000000000 --- a/src/utils/stringlist-to-array.js +++ /dev/null @@ -1,9 +0,0 @@ -'use strict' - -// Converts a go-ipfs "stringList" to an array -// { Strings: ['A', 'B'] } --> ['A', 'B'] -function stringlistToArray (res, cb) { - cb(null, res.Strings || []) -} - -module.exports = stringlistToArray diff --git a/test/interface.spec.js b/test/interface.spec.js index 220c79aa2..86ffac21d 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -226,7 +226,7 @@ describe('interface-ipfs-core tests', () => { tests.namePubsub(CommonFactory.create({ spawnOptions: { args: ['--enable-namesys-pubsub'], - initOptions: { bits: 1024 } + initOptions: { bits: 1024, profile: 'test' } } }), { skip: [ @@ -267,22 +267,20 @@ describe('interface-ipfs-core tests', () => { tests.pubsub(CommonFactory.create({ spawnOptions: { args: ['--enable-pubsub-experiment'], - initOptions: { bits: 1024 } + initOptions: { bits: 1024, profile: 'test' } } }), { - skip: isNode ? [ + skip: isWindows ? [ // pubsub.subscribe - isWindows ? { + { name: 'should send/receive 100 messages', reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - } : null, - isWindows ? { + }, + { name: 'should receive multiple messages', reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - } : null - ] : { - reason: 'FIXME pubsub is not supported in the browser https://github.com/ipfs/js-ipfs-http-client/issues/518' - } + } + ] : null }) tests.repo(defaultCommonFactory) diff --git a/test/lib.configure.spec.js b/test/lib.configure.spec.js new file mode 100644 index 000000000..f58ca4de7 --- /dev/null +++ b/test/lib.configure.spec.js @@ -0,0 +1,77 @@ +/* eslint-env mocha, browser */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const Multiaddr = require('multiaddr') +const { isBrowser, isWebWorker } = require('ipfs-utils/src/env') + +const configure = require('../src/lib/configure') + +describe('lib/configure', () => { + it('should accept no config', () => { + configure(config => { + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql('') + } else { + expect(config.apiAddr).to.eql('http://localhost:5001') + } + })() + }) + + it('should accept string multiaddr', () => { + const input = '/ip4/127.0.0.1/tcp/5001' + configure(config => { + expect(config.apiAddr).to.eql('http://127.0.0.1:5001') + })(input) + }) + + it('should accept multiaddr instance', () => { + const input = Multiaddr('/ip4/127.0.0.1') + configure(config => { + expect(config.apiAddr).to.eql('http://127.0.0.1') + })(input) + }) + + it('should accept object with protocol, host and port', () => { + const input = { protocol: 'https', host: 'ipfs.io', port: 138 } + configure(config => { + expect(config.apiAddr).to.eql('https://ipfs.io:138') + })(input) + }) + + it('should accept object with protocol only', () => { + const input = { protocol: 'https' } + configure(config => { + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql(`https://${location.host}`) + } else { + expect(config.apiAddr).to.eql('https://localhost:5001') + } + })(input) + }) + + it('should accept object with host only', () => { + const input = { host: 'ipfs.io' } + configure(config => { + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql(`http://ipfs.io:${location.port}`) + } else { + expect(config.apiAddr).to.eql('http://ipfs.io:5001') + } + })(input) + }) + + it('should accept object with port only', () => { + const input = { port: 138 } + configure(config => { + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql(`http://${location.hostname}:138`) + } else { + expect(config.apiAddr).to.eql('http://localhost:138') + } + })(input) + }) +}) diff --git a/test/lib.error-handler.spec.js b/test/lib.error-handler.spec.js new file mode 100644 index 000000000..4e97260ec --- /dev/null +++ b/test/lib.error-handler.spec.js @@ -0,0 +1,54 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const { HTTPError } = require('ky-universal') +const expect = chai.expect +chai.use(dirtyChai) +const throwsAsync = require('./utils/throws-async') +const errorHandler = require('../src/lib/error-handler') + +describe('lib/error-handler', () => { + it('should parse json error response', async () => { + const res = { + ok: false, + headers: { get: () => 'application/json' }, + json: () => Promise.resolve({ + Message: 'boom', + Code: 0, + Type: 'error' + }), + status: 500 + } + + const err = await throwsAsync(errorHandler(res)) + + expect(err.message).to.eql('boom') + expect(err.status).to.eql(500) + }) + + it('should gracefully fail on parse json', async () => { + const res = { + ok: false, + headers: { get: () => 'application/json' }, + json: () => 'boom', // not valid json! + status: 500 + } + + const err = await throwsAsync(errorHandler(res)) + expect(err instanceof HTTPError).to.be.true() + }) + + it('should gracefully fail on read text', async () => { + const res = { + ok: false, + headers: { get: () => 'text/plain' }, + text: () => Promise.reject(new Error('boom')), + status: 500 + } + + const err = await throwsAsync(errorHandler(res)) + expect(err instanceof HTTPError).to.be.true() + }) +}) diff --git a/test/lib.stream-to-iterable.spec.js b/test/lib.stream-to-iterable.spec.js new file mode 100644 index 000000000..6c14cac94 --- /dev/null +++ b/test/lib.stream-to-iterable.spec.js @@ -0,0 +1,43 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const toIterable = require('../src/lib/stream-to-iterable') + +describe('lib/stream-to-iterable', () => { + it('should return input if already async iterable', () => { + const input = { [Symbol.asyncIterator] () { return this } } + expect(toIterable(input)).to.equal(input) + }) + + it('should convert reader to async iterable', async () => { + const inputData = [2, 31, 3, 4] + const input = { + getReader () { + let i = 0 + return { + read () { + return i === inputData.length + ? { done: true } + : { value: inputData[i++] } + }, + releaseLock () {} + } + } + } + + const chunks = [] + for await (const chunk of toIterable(input)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should throw on unknown stream', () => { + expect(() => toIterable({})).to.throw('unknown stream') + }) +}) diff --git a/test/pubsub-in-browser.spec.js b/test/pubsub-in-browser.spec.js deleted file mode 100644 index ff1a22347..000000000 --- a/test/pubsub-in-browser.spec.js +++ /dev/null @@ -1,162 +0,0 @@ -/* - We currently don't support pubsub when run in the browser, - and we test it with separate set of tests to make sure - if it's being used in the browser, pubsub errors. - - More info: https://github.com/ipfs/js-ipfs-http-client/issues/518 - - This means: - - You can use pubsub from js-ipfs-http-client in Node.js - - You can use pubsub from js-ipfs-http-client in Electron - (when js-ipfs-http-client is ran in the main process of Electron) - - - You can't use pubsub from js-ipfs-http-client in the browser - - You can't use pubsub from js-ipfs-http-client in Electron's - renderer process - - - You can use pubsub from js-ipfs in the browsers - - You can use pubsub from js-ipfs in Node.js - - You can use pubsub from js-ipfs in Electron - (in both the main process and the renderer process) - - See https://github.com/ipfs/js-ipfs for details on - pubsub in js-ipfs -*/ - -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ['error', 8] */ -'use strict' - -const isNode = require('detect-node') -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const ipfsClient = require('../src') -const f = require('./utils/factory') - -const expectedError = 'pubsub is currently not supported when run in the browser' - -describe('.pubsub is not supported in the browser, yet!', function () { - this.timeout(50 * 1000) - - if (isNode) { return } - - const topic = 'pubsub-tests' - let ipfs - let ipfsd - - before((done) => { - f.spawn({ initOptions: { bits: 1024, profile: 'test' } }, (err, _ipfsd) => { - expect(err).to.not.exist() - ipfsd = _ipfsd - ipfs = ipfsClient(_ipfsd.apiAddr) - done() - }) - }) - - after((done) => { - if (!ipfsd) return done() - ipfsd.stop(done) - }) - - describe('everything errors', () => { - describe('Callback API', () => { - describe('.publish', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.publish(topic, 'hello friend', (err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.subscribe', () => { - const handler = () => {} - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.subscribe(topic, handler, {}, (err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.peers', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.peers(topic, (err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.ls', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.ls((err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - }) - - describe('Promise API', () => { - describe('.publish', () => { - it('throws an error if called in the browser', () => { - return ipfs.pubsub.publish(topic, 'hello friend') - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - }) - }) - }) - - describe('.subscribe', () => { - const handler = () => {} - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.subscribe(topic, handler, {}) - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.peers', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.peers(topic) - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.ls', () => { - it('throws an error if called in the browser', () => { - return ipfs.pubsub.ls() - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - }) - }) - }) - }) - - describe('.unsubscribe', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.unsubscribe('test', () => {}, (err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - }) -}) diff --git a/test/utils/throws-async.js b/test/utils/throws-async.js new file mode 100644 index 000000000..0d4e677fd --- /dev/null +++ b/test/utils/throws-async.js @@ -0,0 +1,10 @@ +'use strict' + +module.exports = async fnOrPromise => { + try { + await (fnOrPromise.then ? fnOrPromise : fnOrPromise()) + } catch (err) { + return err + } + throw new Error('did not throw') +}