Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
refactor: convert pubsub API to async/await (#2672)
Browse files Browse the repository at this point in the history
* refactor: convert pubsub API to async/await

* refactor: switch wording to not enabled
  • Loading branch information
Alan Shaw authored Dec 13, 2019
1 parent 0921a82 commit 78f361e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 86 deletions.
92 changes: 7 additions & 85 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,90 +1,12 @@
'use strict'

const callbackify = require('callbackify')
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
const errcode = require('err-code')

module.exports = function pubsub (self) {
function checkOnlineAndEnabled () {
if (!self.isOnline()) {
throw errcode(new Error(OFFLINE_ERROR), 'ERR_OFFLINE')
}

if (!self.libp2p.pubsub) {
throw errcode(new Error('pubsub is not enabled'), 'ERR_PUBSUB_DISABLED')
}
}

module.exports = ({ libp2p }) => {
return {
subscribe: (topic, handler, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

if (typeof callback === 'function') {
try {
checkOnlineAndEnabled()
} catch (err) {
return callback(err)
}

self.libp2p.pubsub.subscribe(topic, handler, options, callback)
return
}

try {
checkOnlineAndEnabled()
} catch (err) {
return Promise.reject(err)
}

return self.libp2p.pubsub.subscribe(topic, handler, options)
},

unsubscribe: (topic, handler, callback) => {
if (typeof callback === 'function') {
try {
checkOnlineAndEnabled()
} catch (err) {
return callback(err)
}

self.libp2p.pubsub.unsubscribe(topic, handler, callback)
return
}

try {
checkOnlineAndEnabled()
} catch (err) {
return Promise.reject(err)
}

return self.libp2p.pubsub.unsubscribe(topic, handler)
},

publish: callbackify(async (topic, data) => { // eslint-disable-line require-await
checkOnlineAndEnabled()

await self.libp2p.pubsub.publish(topic, data)
}),

ls: callbackify(async () => { // eslint-disable-line require-await
checkOnlineAndEnabled()

return self.libp2p.pubsub.ls()
}),

peers: callbackify(async (topic) => { // eslint-disable-line require-await
checkOnlineAndEnabled()

return self.libp2p.pubsub.peers(topic)
}),

setMaxListeners (n) {
checkOnlineAndEnabled()

self.libp2p.pubsub.setMaxListeners(n)
}
subscribe: (...args) => libp2p.pubsub.subscribe(...args),
unsubscribe: (...args) => libp2p.pubsub.unsubscribe(...args),
publish: (...args) => libp2p.pubsub.publish(...args),
ls: (...args) => libp2p.pubsub.getTopics(...args),
peers: (...args) => libp2p.pubsub.getSubscribers(...args),
setMaxListeners: (n) => libp2p.pubsub.setMaxListeners(n)
}
}
5 changes: 4 additions & 1 deletion src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const PeerBook = require('peer-book')
const IPNS = require('../ipns')
const routingConfig = require('../ipns/routing/config')
const defer = require('p-defer')
const { AlreadyInitializedError } = require('../errors')
const { AlreadyInitializedError, NotEnabledError } = require('../errors')
const Commands = require('./')

module.exports = ({
Expand Down Expand Up @@ -134,6 +134,9 @@ function createApi ({
config: Commands.config({ repo }),
init: () => { throw new AlreadyInitializedError() },
ping: Commands.ping({ libp2p }),
pubsub: libp2p.pubsub
? Commands.pubsub({ libp2p })
: () => { throw new NotEnabledError('pubsub not enabled') },
start: () => apiManager.api,
stop
}
Expand Down
11 changes: 11 additions & 0 deletions src/core/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ class NotStartedError extends Error {

NotStartedError.code = 'ERR_NOT_STARTED'
exports.NotStartedError = NotStartedError

class NotEnabledError extends Error {
constructor (message = 'not enabled') {
super(message)
this.name = 'NotEnabledError'
this.code = NotEnabledError.code
}
}

NotEnabledError.code = 'ERR_NOT_ENABLED'
exports.NotEnabledError = NotEnabledError

0 comments on commit 78f361e

Please sign in to comment.