Skip to content

Commit

Permalink
(refactor) use async/await where possible in tests/ws2
Browse files Browse the repository at this point in the history
  • Loading branch information
f3rno authored and JacobPlaster committed Feb 24, 2020
1 parent e0656cb commit bbf3166
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 351 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
4.0.7
- WSv2: refactor to use async/await style where possible
- WSv2: reconnect() now always resolves on completion

4.0.6
- WSv2: fix internal flag persistence #521

Expand Down
101 changes: 55 additions & 46 deletions lib/transports/ws2.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ class WSv2 extends EventEmitter {
*
* @return {Promise} p
*/
open () {
async open () {
if (this._isOpen || this._ws !== null) {
return Promise.reject(new Error('already open'))
throw new Error('already open')
}

debug('connecting to %s...', this._url)
Expand All @@ -219,14 +219,14 @@ class WSv2 extends EventEmitter {

return new Promise((resolve, reject) => {
this._ws.on('open', () => {
if (this._enabledFlags !== 0) {
this.sendEnabledFlags()
}

// call manually instead of binding to open event so it fires at the
// right time
this._onWSOpen()

if (this._enabledFlags !== 0) {
this.sendEnabledFlags()
}

debug('connected')
resolve()
})
Expand All @@ -239,16 +239,16 @@ class WSv2 extends EventEmitter {
*
* @param {number} code - passed to ws
* @param {string} reason - passed to ws
* @return {Promise}
* @return {Promise} p
*/
close (code, reason) {
async close (code, reason) {
if (!this._isOpen || this._ws === null) {
return Promise.reject(new Error('not open'))
throw new Error('not open')
}

debug('disconnecting...')

return new Promise((resolve, reject) => {
return new Promise((resolve) => {
this._ws.once('close', () => {
this._isOpen = false
this._ws = null
Expand All @@ -275,10 +275,13 @@ class WSv2 extends EventEmitter {
* @param {number?} dms - optional dead man switch flag, active 4
* @return {Promise} p
*/
auth (calc, dms) {
if (!this._isOpen) return Promise.reject(new Error('not open'))
async auth (calc, dms) {
if (!this._isOpen) {
throw new Error('not open')
}

if (this._isAuthenticated) {
return Promise.reject(new Error('already authenticated'))
throw new Error('already authenticated')
}

const authNonce = nonce()
Expand All @@ -289,7 +292,7 @@ class WSv2 extends EventEmitter {
if (_isFinite(calc)) authArgs.calc = calc
if (_isFinite(dms)) authArgs.dms = dms

return new Promise((resolve, reject) => {
return new Promise((resolve) => {
this.once('auth', () => {
debug('authenticated')
resolve()
Expand Down Expand Up @@ -317,6 +320,10 @@ class WSv2 extends EventEmitter {

if (this._ws !== null && this._isOpen) { // did we get a watchdog timeout and need to close the connection?
await this.close()

return new Promise((resolve) => {
this.once(this._wasEverAuthenticated ? 'auth' : 'open', resolve)
})
} else {
await this.reconnectAfterClose() // we are already closed, so reopen and re-auth
}
Expand Down Expand Up @@ -393,11 +400,13 @@ class WSv2 extends EventEmitter {
/**
* Trigger the packet watch-dog; called when we haven't seen a new WS packet
* for longer than our WD duration (if provided)
*
* @return {Promise} p
* @private
*/
_triggerPacketWD () {
async _triggerPacketWD () {
if (!this._packetWDDelay || !this._isOpen) {
return Promise.resolve()
return
}

debug(
Expand Down Expand Up @@ -429,6 +438,9 @@ class WSv2 extends EventEmitter {
}, this._packetWDDelay)
}

/**
* Subscribes to previously subscribed channels, used after reconnecting
*/
resubscribePreviousChannels () {
Object.values(this._prevChannelMap).forEach((chan) => {
const { channel } = chan
Expand Down Expand Up @@ -458,7 +470,9 @@ class WSv2 extends EventEmitter {
break
}

default: {}
default: {
debug('unknown previously subscribed channel type: %s', channel)
}
}
})
}
Expand Down Expand Up @@ -486,7 +500,7 @@ class WSv2 extends EventEmitter {
/**
* @private
*/
_onWSClose () {
async _onWSClose () {
this._isOpen = false
this._isAuthenticated = false
this._lastAuthSeq = -1
Expand All @@ -504,17 +518,15 @@ class WSv2 extends EventEmitter {
if (this._isReconnecting || (this._autoReconnect && !this._isClosing)) {
this._prevChannelMap = this._channelMap

setTimeout(() => {
if (this._reconnectThrottler) {
this._reconnectThrottler
.add(this.reconnectAfterClose.bind(this))
.catch((err) => {
debug('error reconnectAfterClose: %s', err.stack)
})
} else {
this.reconnectAfterClose().catch((err) => {
debug('error reconnectAfterClose: %s', err.stack)
})
setTimeout(async () => {
try {
if (this._reconnectThrottler) {
await this._reconnectThrottler.add(this.reconnectAfterClose.bind(this))
} else {
await this.reconnectAfterClose()
}
} catch (err) {
debug('error reconnectAfterClose: %s', err.stack)
}
}, this._reconnectDelay)
}
Expand Down Expand Up @@ -1371,7 +1383,7 @@ class WSv2 extends EventEmitter {
* @param {boolean} args.audit - if true, an error is emitted on invalid seq
* @return {Promise} p
*/
enableSequencing (args = { audit: true }) {
async enableSequencing (args = { audit: true }) {
this._seqAudit = args.audit === true

return this.enableFlag(FLAGS.SEQ_ALL)
Expand Down Expand Up @@ -1588,9 +1600,9 @@ class WSv2 extends EventEmitter {
* @param {Object|Array} order
* @return {Promise} p - resolves on submit notification
*/
submitOrder (order) {
async submitOrder (order) {
if (!this._isAuthenticated) {
return Promise.reject(new Error('not authenticated'))
throw new Error('not authenticated')
}

const packet = Array.isArray(order)
Expand Down Expand Up @@ -1620,13 +1632,13 @@ class WSv2 extends EventEmitter {
* @param {Object} changes - requires at least an 'id'
* @return {Promise} p - resolves on receival of confirmation notification
*/
updateOrder (changes = {}) {
async updateOrder (changes = {}) {
const { id } = changes

if (!this._isAuthenticated) {
return Promise.reject(new Error('not authenticated'))
throw new Error('not authenticated')
} else if (!id) {
return Promise.reject(new Error('order ID required for update'))
throw new Error('order ID required for update')
}

this._sendOrderPacket([0, 'ou', null, changes])
Expand All @@ -1642,9 +1654,9 @@ class WSv2 extends EventEmitter {
* @param {Object|Array|number} order
* @return {Promise} p
*/
cancelOrder (order) {
async cancelOrder (order) {
if (!this._isAuthenticated) {
return Promise.reject(new Error('not authenticated'))
throw new Error('not authenticated')
}

const id = typeof order === 'number'
Expand All @@ -1668,14 +1680,12 @@ class WSv2 extends EventEmitter {
* @param {Object[]|Array[]|number[]} orders
* @return {Promise} p
*/
cancelOrders (orders) {
async cancelOrders (orders) {
if (!this._isAuthenticated) {
return Promise.reject(new Error('not authenticated'))
throw new Error('not authenticated')
}

return Promise.all(orders.map((order) => {
return this.cancelOrder(order)
}))
return Promise.all(orders.map(this.cancelOrder))
}

/**
Expand All @@ -1686,14 +1696,13 @@ class WSv2 extends EventEmitter {
* @param {Object[]} opPayloads
* @return {Promise} p - rejects if not authenticated
*/
submitOrderMultiOp (opPayloads) {
async submitOrderMultiOp (opPayloads) {
if (!this._isAuthenticated) {
return Promise.reject(new Error('not authenticated'))
throw new Error('not authenticated')
}

// TODO: multi-op tracking
this.send([0, 'ox_multi', null, opPayloads])

return Promise.resolve() // TODO: multi-op tracking
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bitfinex-api-node",
"version": "4.0.6",
"version": "4.0.7",
"description": "Node reference library for Bitfinex API",
"engines": {
"node": ">=7"
Expand Down
Loading

0 comments on commit bbf3166

Please sign in to comment.