Skip to content

Commit

Permalink
feat: improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 8dccda1 commit e9f9917
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 347 deletions.
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
"license": "MIT",
"devDependencies": {
"aegir": "^13.0.6",
"pull-abortable": "^4.1.1"
"async": "^2.6.0",
"chai-checkmark": "^1.0.1",
"pull-abortable": "^4.1.1",
"pull-generate": "^2.2.0"
},
"repository": {
"type": "git",
"url": "git+https://github.com/dryajov/pull-plex.git"
},
"dependencies": {
"async": "^2.6.0",
"chai": "^4.1.2",
"debug": "^3.1.0",
"dirty-chai": "^2.0.1",
Expand Down
89 changes: 89 additions & 0 deletions profile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict'

const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const generate = require('pull-generate')
const each = require('async/each')
const eachLimit = require('async/eachLimit')
const setImmediate = require('async/setImmediate')

const Plex = require('./src')

const spawn = (nStreams, nMsg, done, limit) => {
const p = pair()

const check = marker(2 * nStreams, done)

const msg = 'simple msg'

const listener = new Plex(false)
const dialer = new Plex(true)

pull(dialer, p[0], dialer)
pull(listener, p[1], listener)

listener.on('stream', (stream) => {
pull(
stream,
pull.onEnd((err) => {
if (err) { return done(err) }
check()
pull(pull.empty(), stream)
})
)
})

const numbers = []
for (let i = 0; i < nStreams; i++) {
numbers.push(i)
}

const spawnStream = (n, cb) => {
const stream = dialer.createStream()
pull(
generate(0, (s, cb) => {
setImmediate(() => {
cb(s === nMsg ? true : null, msg, s + 1)
})
}),
stream,
pull.collect((err) => {
if (err) { return done(err) }
check()
cb()
})
)
}

if (limit) {
eachLimit(numbers, limit, spawnStream, () => {})
} else {
each(numbers, spawnStream, () => {})
}
}

function marker (n, done) {
let i = 0
return (err) => {
i++

// console.log(`${i} out of ${n} interactions`)
if (err) {
console.error('Failed after %s iterations', i)
return done(err)
}

if (i === n) {
done()
}
}
}


spawn(1000, 1000, (err) => {
if (err) {
throw err
}
console.log('Done')
process.exit(0)
}, 50000)
111 changes: 34 additions & 77 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const pushable = require('pull-pushable')

const consts = require('./consts')
const utils = require('./utils')
const EE = require('events')

const debug = require('debug')
Expand Down Expand Up @@ -41,9 +40,9 @@ class Channel extends EE {
this._msgs = pushable((err) => {
this._log('source closed', err)
if (this._reset) { return } // don't try closing the channel on reset
this.endChan((err) => {
if (err) { setImmediate(() => this.emit('error', err)) }
})

this.endChan()
if (err) { this.emit('error', err) }
})

this._source = this._msgs
Expand All @@ -59,28 +58,20 @@ class Channel extends EE {

// source ended, close the stream
if (end === true) {
this.endChan((err) => {
if (err) {
log.err(err)
setImmediate(() => this.emit('error', err))
}
})
return
return this.endChan()
}

// source errored, reset stream
if (end || this._reset) {
this.resetChan(() => {
setImmediate(() => this.emit('error', end || this._reset))
this.reset()
})
this.resetChan()
this.emit('error', end || this._reset)
this.reset()
return
}

// just send
return this.sendMsg(data, (err) => {
read(err, next)
})
this.sendMsg(data)
return read(null, next)
}

read(null, next)
Expand Down Expand Up @@ -126,97 +117,63 @@ class Channel extends EE {
this.close(this._reset)
}

openChan (cb) {
openChan () {
this._log('openChan')

this.open = true // avoid duplicate open msgs
utils.encodeMsg(this._id,
consts.NEW,
this._name,
(err, data) => {
if (err) {
log.err(err)
this.open = false
return cb(err)
}

this._plex.push(data)
cb(null, this)
})
this.open = true
this._plex.push([
this._id,
consts.type.NEW,
this._name
])
}

sendMsg (data, cb) {
sendMsg (data) {
this._log('sendMsg', data)

if (!this.open) {
return this.openChan((err) => {
if (err) {
log.err(err)
return cb(err)
}

this.sendMsg(data, cb)
})
this.openChan()
}

utils.encodeMsg(this._id,
this._plex.push([
this._id,
this._initiator
? consts.type.OUT_MESSAGE
: consts.type.IN_MESSAGE,
data,
(err, data) => {
if (err) {
log.err(err)
return cb(err)
}

this._plex.push(data)
cb()
})
data
])
}

endChan (cb) {
endChan () {
this._log('endChan')

if (!this.open) {
return cb()
return
}

utils.encodeMsg(this._id,
this._plex.push([
this._id,
this._initiator
? consts.type.OUT_CLOSE
: consts.type.IN_CLOSE,
'',
(err, data) => {
if (err) {
log.err(err)
return cb(err)
}
this._plex.push(data)
cb()
})
''
])
}

resetChan (cb) {
resetChan () {
this._log('endChan')

if (!this.open) {
return cb()
return
}

utils.encodeMsg(this._id,
this._plex.push([
this._id,
this._initiator
? consts.type.OUT_RESET
: consts.type.IN_RESET,
'',
(err, data) => {
if (err) {
log.err(err)
return cb(err)
}
this._plex.push(data)
cb()
})
''
])
}
}

Expand Down
Loading

0 comments on commit e9f9917

Please sign in to comment.