Skip to content

Commit

Permalink
fix: socket back pressure memory leak
Browse files Browse the repository at this point in the history
Fixes: #434
  • Loading branch information
ronag committed Sep 24, 2020
1 parent 6a22361 commit 3f1deb0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
24 changes: 20 additions & 4 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ class Parser extends HTTPParser {

try {
if (request.onBody(chunk, offset, length) === false) {
socket.pause()
socket[kPause]()
}
} catch (err) {
util.destroy(socket, err)
Expand Down Expand Up @@ -630,7 +630,7 @@ class Parser extends HTTPParser {
util.destroy(socket, new InformationalError('reset'))
}
} else {
socket.resume()
socket[kResume]()
resume(client)
}
}
Expand Down Expand Up @@ -779,8 +779,8 @@ function connect (client) {
parser.consume(socket._handle._externalStream)
}

socket[kPause] = socket.pause.bind(socket)
socket[kResume] = socket.resume.bind(socket)
socket[kPause] = socketPause.bind(socket)
socket[kResume] = socketResume.bind(socket)
socket[kError] = null
socket[kParser] = parser
socket[kClient] = client
Expand All @@ -794,6 +794,22 @@ function connect (client) {
.on('close', onSocketClose)
}

function socketPause () {
// TODO: Pause parser.
if (this._handle && this._handle.reading) {
this._handle.reading = false
this._handle.readStop()
}
}

function socketResume () {
// TODO: Resume parser.
if (this._handle && !this._handle.reading) {
this._handle.reading = true
this._handle.readStart()
}
}

function emitDrain (client) {
client[kNeedDrain] = 0
client.emit('drain')
Expand Down
45 changes: 45 additions & 0 deletions test/socket-back-pressure.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict'

const { Client } = require('..')
const { createServer } = require('http')
const { Readable } = require('stream')
const { test } = require('tap')

test('socket back-pressure', (t) => {
t.plan(2)

const server = createServer()

let body

server.on('request', (req, res) => {
let bytesWritten = 0
const buf = Buffer.allocUnsafe(16384)
new Readable({
read () {
bytesWritten += buf.length
this.push(buf)
if (bytesWritten >= 1e6) {
this.push(null)
}
}
}).on('end', () => {
t.ok(body._readableState.length < body._readableState.highWaterMark)
}).pipe(res)
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 1
})

client.request({ path: '/', method: 'GET', opaque: 'asd' }, (err, data) => {
t.error(err)
body = data.body
.resume()
.on('data', () => {
data.body.pause()
})
})
})
})

0 comments on commit 3f1deb0

Please sign in to comment.