Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
feat: optional self emit (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jul 25, 2019
1 parent 2a4cae9 commit a9e73d7
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 5 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,18 @@ Floodsub emits two kinds of events:

## API

See https://libp2p.github.io/js-libp2p-floodsub
### Create a floodsub implementation

```js
const options = {…}
const floodsub = new Floodsub(libp2pNode, options)
```

Options is an optional object with the following key-value pairs:

* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **true**).

For more, see https://libp2p.github.io/js-libp2p-floodsub

## Contribute

Expand Down
18 changes: 14 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,27 @@ const noop = () => {}
*/
class FloodSub extends BaseProtocol {
/**
* @param {Object} libp2p
* @param {Object} libp2p an instance of Libp2p
* @param {Object} [options]
* @param {boolean} options.emitSelf if publish should emit to self, if subscribed, defaults to true
* @constructor
*/
constructor (libp2p) {
constructor (libp2p, options = {}) {
super('libp2p:floodsub', multicodec, libp2p)

/**
* List of our subscriptions
* @type {Set<string>}
*/
this.subscriptions = new Set()

/**
* Pubsub options
*/
this._options = {
emitSelf: true,
...options
}
}

/**
Expand Down Expand Up @@ -203,8 +213,8 @@ class FloodSub extends BaseProtocol {
topicIDs: topics
}

// Emit to self if I'm interested
this._emitMessages(topics, [message])
// Emit to self if I'm interested and it is enabled
this._options.emitSelf && this._emitMessages(topics, [message])

this._buildMessage(message, cb)
}
Expand Down
100 changes: 100 additions & 0 deletions test/emit-self.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */
'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-spies'))
const expect = chai.expect
const series = require('async/series')

const FloodSub = require('../src')

const {
createNode
} = require('./utils')

const shouldNotHappen = (_) => expect.fail()

describe('emit self', () => {
const topic = 'Z'

describe('enabled', () => {
let nodeA
let fsA

before((done) => {
createNode((err, node) => {
if (err) {
return done(err)
}
nodeA = node
nodeA.start(done)
})
})

before((done) => {
fsA = new FloodSub(nodeA, { emitSelf: true })
fsA.start(done)
})

before(() => {
fsA.subscribe(topic)
})

after((done) => {
series([
(cb) => fsA.stop(cb),
(cb) => nodeA.stop(cb)
], done)
})

it('should emit to self on publish', async () => {
const promise = new Promise((resolve) => fsA.once(topic, resolve))

fsA.publish(topic, Buffer.from('hey'))

await promise
})
})

describe('disabled', () => {
let nodeA
let fsA

before((done) => {
createNode((err, node) => {
if (err) {
return done(err)
}
nodeA = node
nodeA.start(done)
})
})

before((done) => {
fsA = new FloodSub(nodeA, { emitSelf: false })
fsA.start(done)
})

before(() => {
fsA.subscribe(topic)
})

after((done) => {
series([
(cb) => fsA.stop(cb),
(cb) => nodeA.stop(cb)
], done)
})

it('should emit to self on publish', async () => {
fsA.once(topic, (m) => shouldNotHappen)

fsA.publish(topic, Buffer.from('hey'))

// Wait 1 second to guarantee that self is not noticed
await new Promise((resolve) => setTimeout(() => resolve(), 1000))
})
})
})

0 comments on commit a9e73d7

Please sign in to comment.