Skip to content

Commit

Permalink
Revert "use FixedQueue"
Browse files Browse the repository at this point in the history
This reverts commit 0e415b2.
  • Loading branch information
tsctx committed May 21, 2024
1 parent 826c84c commit c05988c
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions lib/web/websocket/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

const { WebsocketFrameSend } = require('./frame')
const { opcodes, sendHints } = require('./constants')
const FixedQueue = require('../../dispatcher/fixed-queue')

/** @type {typeof Uint8Array} */
const FastBuffer = Buffer[Symbol.species]

/**
* @typedef {object} QueueNode
* @typedef {object} SendQueueNode
* @property {SendQueueNode | null} next
* @property {Promise<void> | null} promise
* @property {((...args: any[]) => any)} callback
* @property {Buffer | null} frame
*/

class SendQueue {
/**
* @type {FixedQueue | null}
* @type {SendQueueNode | null}
*/
#queue = null
#head = null
/**
* @type {SendQueueNode | null}
*/
#tail = null

/**
* @type {boolean}
Expand All @@ -39,19 +43,24 @@ class SendQueue {
// fast-path
this.#socket.write(frame, cb)
} else {
/** @type {QueueNode} */
/** @type {SendQueueNode} */
const node = {
next: null,
promise: null,
callback: cb,
frame
}
(this.#queue ??= new FixedQueue()).push(node)
if (this.#tail !== null) {
this.#tail.next = node
}
this.#tail = node
}
return
}

/** @type {QueueNode} */
/** @type {SendQueueNode} */
const node = {
next: null,
promise: item.arrayBuffer().then((ab) => {
node.promise = null
node.frame = createFrame(ab, hint)
Expand All @@ -60,7 +69,13 @@ class SendQueue {
frame: null
}

(this.#queue ??= new FixedQueue()).push(node)
if (this.#tail === null) {
this.#tail = node
}

if (this.#head === null) {
this.#head = node
}

if (!this.#running) {
this.#run()
Expand All @@ -69,10 +84,9 @@ class SendQueue {

async #run () {
this.#running = true
/** @type {FixedQueue} */
const queue = this.#queue
while (!queue.isEmpty()) {
const node = queue.shift()
/** @type {SendQueueNode | null} */
let node = this.#head
while (node !== null) {
// wait pending promise
if (node.promise !== null) {
await node.promise
Expand All @@ -81,7 +95,11 @@ class SendQueue {
this.#socket.write(node.frame, node.callback)
// cleanup
node.callback = node.frame = null
// set next
node = node.next
}
this.#head = null
this.#tail = null
this.#running = false
}
}
Expand Down

0 comments on commit c05988c

Please sign in to comment.