Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Async endeavour: Use promises #108

Merged
merged 43 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d7cbe8c
chore: upgrade deps
mkg20001 Jul 18, 2019
94e5119
feat: first iteration of the idea
mkg20001 Jul 18, 2019
0d7f1fa
feat: iterate a layer deeper
mkg20001 Jul 18, 2019
9c1a001
feat: rewrite handshake
mkg20001 Jul 18, 2019
cda8cb8
feat: rewrite propose
mkg20001 Jul 18, 2019
ac9b284
feat: rewrite finish
mkg20001 Jul 18, 2019
05b62f4
feat: rewrite exchange
mkg20001 Jul 18, 2019
022f1c8
feat: rewrite low-level stuff
mkg20001 Jul 18, 2019
2a52e87
feat: work on rewriting tests
mkg20001 Jul 18, 2019
40999a9
refactor: browser tests
mkg20001 Jul 18, 2019
4ca8858
refactor: .aegir.js
mkg20001 Jul 18, 2019
14ca273
feat: refactor benchmarks
mkg20001 Jul 18, 2019
59a08c5
fix: try to make it work
mkg20001 Jul 18, 2019
4cbcf38
fix: lint
mkg20001 Jul 18, 2019
61ac5c5
refactor: move tests
mkg20001 Oct 21, 2019
66a7ad6
refactor: switch deps
mkg20001 Oct 21, 2019
185f66f
refactor: entry file
mkg20001 Oct 21, 2019
3412264
refactor: a bit more
mkg20001 Oct 21, 2019
d947374
fix: tests
mkg20001 Oct 24, 2019
d13f142
feat: inital iterables refactor
mkg20001 Nov 7, 2019
80f9c71
refactor: streaming
mkg20001 Nov 7, 2019
e1bf46c
refactor: cleanup
mkg20001 Nov 7, 2019
e68a9a9
fix: turn bufferlist into buffer
mkg20001 Nov 7, 2019
d40e5d0
fix: use errors from interfaces
mkg20001 Nov 7, 2019
f838af1
refactor: etm
mkg20001 Nov 7, 2019
4da2808
fix: typo
mkg20001 Nov 7, 2019
51d29f2
fix: .read error
mkg20001 Nov 7, 2019
0b22ae1
fix: satisfy output expectations
mkg20001 Nov 7, 2019
afc8241
fix: it works - WARNING: using varint instead of fixed lp, tests lie
mkg20001 Nov 7, 2019
0790a3d
fix: use errors
mkg20001 Nov 7, 2019
5a2bb2a
refactor: benchmarks
mkg20001 Nov 7, 2019
0da751a
fix: add suggestions from review
mkg20001 Nov 12, 2019
f5ddc2f
fix: upgrade deps and use correct lp-encoder
mkg20001 Nov 13, 2019
7305d2b
refactor: apply changes from review
mkg20001 Nov 13, 2019
aea9a40
refactor: apply changes from review
mkg20001 Nov 13, 2019
55a7424
refactor: apply changes from review
mkg20001 Nov 13, 2019
ec435e6
chore: remove old tests
jacobheun Nov 19, 2019
8d418bf
test: fix async benchmarks
jacobheun Nov 20, 2019
72f05bf
chore: clean up deps
jacobheun Nov 21, 2019
840c217
fix: use fixed encoding/decoding everywhere
jacobheun Nov 21, 2019
41ca19b
test: add verify inbound and outbound secio
jacobheun Nov 21, 2019
22fe5e6
test: verify nonces are boxed
jacobheun Nov 21, 2019
f328ff8
chore: add node 12 to ci
jacobheun Nov 21, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const WS = require('libp2p-websockets')
const PeerId = require('peer-id')
const prom = (fnc) => new Promise((resolve, reject) => fnc((err, res) => err ? reject(err) : resolve(res)))
mkg20001 marked this conversation as resolved.
Show resolved Hide resolved

const secio = require('./src')

Expand All @@ -14,26 +15,22 @@ let listener
module.exports = {
hooks: {
browser: {
pre: (done) => {
PeerId.createFromJSON(peerNodeJSON, (err, peerId) => {
if (err) { throw err }
pre: async () => {
const peerId = await PeerId.createFromJSON(peerNodeJSON)
const ws = new WS()

const ws = new WS()
listener = ws.createListener((conn) => {
const encryptedConn = secio.encrypt(peerId, conn, undefined)
encryptedConn.catch(err => { throw err }) // TODO: make this better

listener = ws.createListener((conn) => {
const encryptedConn = secio.encrypt(peerId, conn, undefined, (err) => {
if (err) { throw err }
})

// echo
pull(encryptedConn, encryptedConn)
})

listener.listen(ma, done)
// echo
pull(encryptedConn, encryptedConn)
})

await prom(cb => listener.listen(ma, cb))
},
post: (done) => {
listener.close(done)
post: async () => {
return prom(cb => listener.close(cb))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ const secio = require('libp2p-secio')

The current `secio` tag, usable in `multistream`.

### `const encryptedConnection = secio.encrypt(localPeerId, plainTextConnection [, remotePeerId] [, callback])`
### `const encryptedConnection = secio.encrypt(localPeerId, plainTextConnection [, remotePeerId])`
mkg20001 marked this conversation as resolved.
Show resolved Hide resolved

- `localPeerId: PeerId` - A PeerId object containing the Private, Public and Id of our node.
- `plainTextConnection: Connection` - The insecure connection to be secured.
- `remotePeerId: PeerId` - A PeerId object containing the Public and/or Id of the node we are doing the SECIO handshake with.
- `callback: Function` - Optional, Called if an error happens during the initialization.

Returns an encrypted [Connection object](https://github.com/libp2p/interface-connection) that is the upgraded `plainTextConnection` with now having every byte encrypted.

Both plainTextConnection and encryptedConnection are at their base, PullStreams.

NOTE: The `encryptedConnection` also has a Promise under the key `encryptedConnection.awaitConnected` which you can await to make sure no errors occured and catch any if there occured any.

### This module uses `pull-streams`

We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).
Expand Down
100 changes: 44 additions & 56 deletions benchmarks/send.js
Original file line number Diff line number Diff line change
@@ -1,73 +1,61 @@
'use strict'

/* eslint-disable no-console */

const Benchmark = require('benchmark')
const pull = require('pull-stream/pull')
const infinite = require('pull-stream/sources/infinite')
const take = require('pull-stream/throughs/take')
const drain = require('pull-stream/sinks/drain')
const Connection = require('interface-connection').Connection
const parallel = require('async/parallel')
const pair = require('pull-pair/duplex')
const PeerId = require('peer-id')

const secio = require('../src')
const pipe = require('it-pipe')
const { reduce } = require('streaming-iterables')
const DuplexPair = require('it-pair/duplex')

const secio = require('..')

const suite = new Benchmark.Suite('secio')
let peers

function sendData (a, b, opts, finish) {
async function sendData (a, b, opts, finish) {
opts = Object.assign({ times: 1, size: 100 }, opts)

pull(
infinite(() => Buffer.allocUnsafe(opts.size)),
take(opts.times),
a
)

let length = 0
let i = opts.times

pull(
b,
drain((data) => {
length += data.length
}, () => {
if (length !== opts.times * opts.size) {
throw new Error('Did not receive enough chunks')
pipe(
function * () {
while (i--) {
yield Buffer.allocUnsafe(opts.size)
}
finish.resolve()
})
},
a.sink
)

const res = await pipe(
b.source,
reduce((acc, val) => acc + val.length, 0)
)
}

function ifErr (err) {
if (err) {
throw err
if (res !== opts.times * opts.size) {
throw new Error('Did not receive enough chunks')
}
}

suite.add('create peers for test', (deferred) => {
parallel([
(cb) => PeerId.createFromJSON(require('./peer-a'), cb),
(cb) => PeerId.createFromJSON(require('./peer-b'), cb)
], (err, _peers) => {
if (err) { throw err }
peers = _peers

deferred.resolve()
})
}, { defer: true })
suite.add('create peers for test', async () => {
peers = await Promise.all([
PeerId.createFromJSON(require('./peer-a')),
PeerId.createFromJSON(require('./peer-b'))
])
})

suite.add('establish an encrypted channel', (deferred) => {
const p = pair()
suite.add('establish an encrypted channel', async () => {
const p = DuplexPair()

const peerA = peers[0]
const peerB = peers[1]

const aToB = secio.encrypt(peerA, new Connection(p[0]), peerB, ifErr)
const bToA = secio.encrypt(peerB, new Connection(p[1]), peerA, ifErr)
const aToB = await secio.secureInbound(peerA, p[0], peerB)
const bToA = await secio.secureOutbound(peerB, p[1], peerA)

sendData(aToB, bToA, {}, deferred)
}, { defer: true })
await sendData(aToB.conn, bToA.conn, {})
})

const cases = [
[10, 262144],
Expand All @@ -81,23 +69,23 @@ cases.forEach((el) => {
const times = el[0]
const size = el[1]

suite.add(`send plaintext ${times} x ${size} bytes`, (deferred) => {
const p = pair()
suite.add(`send plaintext ${times} x ${size} bytes`, async () => {
const p = DuplexPair()

sendData(p[0], p[1], { times: times, size: size }, deferred)
}, { defer: true })
await sendData(p[0], p[1], { times: times, size: size })
})

suite.add(`send encrypted ${times} x ${size} bytes`, (deferred) => {
const p = pair()
suite.add(`send encrypted ${times} x ${size} bytes`, async () => {
const p = DuplexPair()

const peerA = peers[0]
const peerB = peers[1]

const aToB = secio.encrypt(peerA, new Connection(p[0]), peerB, ifErr)
const bToA = secio.encrypt(peerB, new Connection(p[1]), peerA, ifErr)
const aToB = await secio.secureInbound(peerA, p[0], peerB)
const bToA = await secio.secureOutbound(peerB, p[0], peerA)

sendData(aToB, bToA, { times: times, size: size }, deferred)
}, { defer: true })
await sendData(aToB.conn, bToA.conn, { times: times, size: size })
})
})

suite.on('cycle', (event) => {
Expand Down
36 changes: 19 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,33 @@
],
"license": "MIT",
"dependencies": {
"async": "^2.6.2",
"async": "^3.1.0",
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
"bl": "^4.0.0",
"debug": "^4.1.1",
"interface-connection": "~0.3.3",
"libp2p-crypto": "~0.16.1",
"multiaddr": "^6.0.6",
"multihashing-async": "~0.6.0",
"it-buffer": "^0.1.1",
"it-handshake": "^1.0.1",
"it-length-prefixed": "^3.0.0",
"it-pair": "^1.0.0",
"it-pb-rpc": "^0.1.3",
"it-pipe": "^1.1.0",
"libp2p-crypto": "~0.17.1",
"libp2p-interfaces": "~0.1.3",
"multiaddr": "^7.2.1",
"multihashing-async": "~0.8.0",
"once": "^1.4.0",
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
"peer-id": "~0.12.2",
"peer-info": "~0.15.1",
"peer-id": "~0.13.5",
"peer-info": "~0.16.2",
"protons": "^1.0.1",
"pull-defer": "~0.2.3",
"pull-handshake": "^1.1.4",
"pull-length-prefixed": "^1.3.2",
"pull-stream": "^3.6.9",
"safe-buffer": "^5.1.2"
"safe-buffer": "^5.2.0",
"streaming-iterables": "^4.1.1"
},
"devDependencies": {
"aegir": "^18.2.2",
"aegir": "^20.4.1",
"benchmark": "^2.1.4",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"libp2p-websockets": "~0.12.2",
"multistream-select": "~0.14.4",
"pull-goodbye": "~0.0.2",
"pull-pair": "^1.1.0"
"libp2p-websockets": "~0.13.1",
"multistream-select": "~0.15.1"
},
"engines": {
"node": ">=6.0.0",
Expand Down
78 changes: 19 additions & 59 deletions src/etm.js
Original file line number Diff line number Diff line change
@@ -1,81 +1,41 @@
'use strict'

const pull = require('pull-stream/pull')
const map = require('pull-stream/throughs/map')
const asyncMap = require('pull-stream/throughs/async-map')
const lp = require('pull-length-prefixed')

const lpOpts = {
fixed: true,
bytes: 4
}
const BufferList = require('bl/BufferList')
const { InvalidCryptoTransmissionError } = require('libp2p-interfaces/src/crypto/errors')

exports.createBoxStream = (cipher, mac) => {
return pull(
ensureBuffer(),
asyncMap((chunk, cb) => {
cipher.encrypt(chunk, (err, data) => {
if (err) {
return cb(err)
}

mac.digest(data, (err, digest) => {
if (err) {
return cb(err)
}

cb(null, Buffer.concat([data, digest]))
})
})
}),
lp.encode(lpOpts)
)
return async function * (source) {
for await (const chunk of source) {
const data = await cipher.encrypt(chunk)
const digest = await mac.digest(data)
yield new BufferList([data, digest])
}
}
}

exports.createUnboxStream = (decipher, mac) => {
return pull(
ensureBuffer(),
lp.decode(lpOpts),
asyncMap((chunk, cb) => {
return async function * (source) {
for await (const chunk of source) {
const l = chunk.length
const macSize = mac.length

if (l < macSize) {
return cb(new Error(`buffer (${l}) shorter than MAC size (${macSize})`))
throw new InvalidCryptoTransmissionError(`buffer (${l}) shorter than MAC size (${macSize})`)
}

const mark = l - macSize
const data = chunk.slice(0, mark)
const macd = chunk.slice(mark)

mac.digest(data, (err, expected) => {
if (err) {
return cb(err)
}
const expected = await mac.digest(data)

if (!macd.equals(expected)) {
return cb(new Error(`MAC Invalid: ${macd.toString('hex')} != ${expected.toString('hex')}`))
}

// all good, decrypt
decipher.decrypt(data, (err, decrypted) => {
if (err) {
return cb(err)
}
if (!macd.equals(expected)) {
throw new InvalidCryptoTransmissionError(`MAC Invalid: ${macd.toString('hex')} != ${expected.toString('hex')}`)
}

cb(null, decrypted)
})
})
})
)
}
const decrypted = await decipher.decrypt(data)

function ensureBuffer () {
return map((c) => {
if (typeof c === 'string') {
return Buffer.from(c, 'utf-8')
yield decrypted
}

return c
})
}
}
Loading