diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js index 8c5916b906..0954304f46 100644 --- a/src/core/components/pubsub.js +++ b/src/core/components/pubsub.js @@ -1,90 +1,12 @@ 'use strict' -const callbackify = require('callbackify') -const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR -const errcode = require('err-code') - -module.exports = function pubsub (self) { - function checkOnlineAndEnabled () { - if (!self.isOnline()) { - throw errcode(new Error(OFFLINE_ERROR), 'ERR_OFFLINE') - } - - if (!self.libp2p.pubsub) { - throw errcode(new Error('pubsub is not enabled'), 'ERR_PUBSUB_DISABLED') - } - } - +module.exports = ({ libp2p }) => { return { - subscribe: (topic, handler, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - if (typeof callback === 'function') { - try { - checkOnlineAndEnabled() - } catch (err) { - return callback(err) - } - - self.libp2p.pubsub.subscribe(topic, handler, options, callback) - return - } - - try { - checkOnlineAndEnabled() - } catch (err) { - return Promise.reject(err) - } - - return self.libp2p.pubsub.subscribe(topic, handler, options) - }, - - unsubscribe: (topic, handler, callback) => { - if (typeof callback === 'function') { - try { - checkOnlineAndEnabled() - } catch (err) { - return callback(err) - } - - self.libp2p.pubsub.unsubscribe(topic, handler, callback) - return - } - - try { - checkOnlineAndEnabled() - } catch (err) { - return Promise.reject(err) - } - - return self.libp2p.pubsub.unsubscribe(topic, handler) - }, - - publish: callbackify(async (topic, data) => { // eslint-disable-line require-await - checkOnlineAndEnabled() - - await self.libp2p.pubsub.publish(topic, data) - }), - - ls: callbackify(async () => { // eslint-disable-line require-await - checkOnlineAndEnabled() - - return self.libp2p.pubsub.ls() - }), - - peers: callbackify(async (topic) => { // eslint-disable-line require-await - checkOnlineAndEnabled() - - return self.libp2p.pubsub.peers(topic) - }), - - setMaxListeners (n) { - checkOnlineAndEnabled() - - self.libp2p.pubsub.setMaxListeners(n) - } + subscribe: (...args) => libp2p.pubsub.subscribe(...args), + unsubscribe: (...args) => libp2p.pubsub.unsubscribe(...args), + publish: (...args) => libp2p.pubsub.publish(...args), + ls: (...args) => libp2p.pubsub.getTopics(...args), + peers: (...args) => libp2p.pubsub.getSubscribers(...args), + setMaxListeners: (n) => libp2p.pubsub.setMaxListeners(n) } } diff --git a/src/core/components/start.js b/src/core/components/start.js index bbae1cb271..214563b53e 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -5,7 +5,7 @@ const PeerBook = require('peer-book') const IPNS = require('../ipns') const routingConfig = require('../ipns/routing/config') const defer = require('p-defer') -const { AlreadyInitializedError } = require('../errors') +const { AlreadyInitializedError, NotEnabledError } = require('../errors') const Commands = require('./') module.exports = ({ @@ -134,6 +134,9 @@ function createApi ({ config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, ping: Commands.ping({ libp2p }), + pubsub: libp2p.pubsub + ? Commands.pubsub({ libp2p }) + : () => { throw new NotEnabledError('pubsub not enabled') }, start: () => apiManager.api, stop } diff --git a/src/core/errors.js b/src/core/errors.js index a2ab2542a5..c7cf7ac938 100644 --- a/src/core/errors.js +++ b/src/core/errors.js @@ -41,3 +41,14 @@ class NotStartedError extends Error { NotStartedError.code = 'ERR_NOT_STARTED' exports.NotStartedError = NotStartedError + +class NotEnabledError extends Error { + constructor (message = 'not enabled') { + super(message) + this.name = 'NotEnabledError' + this.code = NotEnabledError.code + } +} + +NotEnabledError.code = 'ERR_NOT_ENABLED' +exports.NotEnabledError = NotEnabledError