Skip to content

Commit

Permalink
Update naming, fix bugs.
Browse files Browse the repository at this point in the history
Updated naming to be more like James' C++ impl:
nodejs/node#16414

Fixed some bugs. stdout-source works again.
  • Loading branch information
Fishrock123 committed Dec 1, 2017
1 parent f1f1d27 commit 1ffe4be
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 36 deletions.
4 changes: 2 additions & 2 deletions data-passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class PassThrough {
this.sink.next(status, error, buffer, bytes)
}

read (error, buffer) {
this.source.read(error, buffer)
pull (error, buffer) {
this.source.pull(error, buffer)
}
}

Expand Down
4 changes: 2 additions & 2 deletions data-sink.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Sink {
// write or process buffer

if (sinkError) {
return this.source.read(error)
return this.source.pull(error)
}

this._read()
Expand All @@ -47,7 +47,7 @@ class Sink {
// sink handles buffer allocation
const buffer = new Buffer(0)

this.source.read(null, buffer)
this.source.pull(null, buffer)
}
}

Expand Down
8 changes: 4 additions & 4 deletions data-source.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ class Source {
this.sink = sink
}

read (error, buffer) {
pull (error, buffer) {
// error MUST be null or an error
// buffer MUST be a Buffer
if (error || sourceError) {
return this.source.next('error', error)
return this.sink.next('error', error)
}

// read into buffer
if (more) {
this.source.next('continue', null, buffer, bytesWritten)
this.sink.next('continue', null, buffer, bytesWritten)
} else {
this.source.next('end', null, buffer, bytesWritten)
this.sink.next('end', null, buffer, bytesWritten)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions fs/file-sink.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ FileSink.prototype.sink = function () {
if (typeof this.fd !== 'number') {
fs.open(this.path, this.flags, this.mode, (error, fd) => {
if (error) {
this.source.read(error)
this.source.pull(error)
}

this.fd = fd
Expand All @@ -98,17 +98,17 @@ FileSink.prototype.sink = function () {

FileSink.prototype._read = function _read () {
if (Buffer.isBuffer(this.buffer))
return this.source.read(null, this.buffer)
return this.source.pull(null, this.buffer)

try {
this.buffer = Buffer.allocUnsafe(64 * 1024)
} catch (error) {
return this.bindCb(error)
}
this.source.read(null, this.buffer)
this.source.pull(null, this.buffer)
}

FileSink.prototype.next = function next (status, error, bytes) {
FileSink.prototype.next = function next (status, error, _, bytes) {
if (status === 'end') {
return fs.close(this.fd, (closeError) => {
if (closeError) {
Expand Down
6 changes: 3 additions & 3 deletions fs/file-source.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ FileSource.prototype.bindSink = function bindSink (sink) {
this.sink = sink
}

FileSource.prototype.read = function(error, buffer) {
FileSource.prototype.pull = function(error, buffer) {
if (error) {
if (typeof this.fd === 'number') {
fs.close(this.fd, (closeError) => {
Expand Down Expand Up @@ -131,13 +131,13 @@ FileSource.prototype._read = function(buffer) {
} else {
if (bytesRead > 0) {
this.pos += bytesRead;
this.sink.next('continue', null, bytesRead)
this.sink.next('continue', null, buffer, bytesRead)
} else {
fs.close(this.fd, (closeError) => {
if (closeError) {
this.sink.next('error', closeError)
} else {
this.sink.next('end', null, -1)
this.sink.next('end', null, buffer, -1)
}
})
}
Expand Down
46 changes: 27 additions & 19 deletions stdio/stdout-sink.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,39 @@

const { Buffer } = require('buffer')

module.exports = function sink (bindCb) {
// bind((err, ?) => {})
class StdoutSink {
constructor () {
this.source = null
this.bindCb = null

try {
this._buffer = Buffer.allocUnsafe(64 * 1024)
} catch (error) {
this._allocError = error
}
}

return function (read) {
bindSource (source, bindCb) {
if (this._allocError) {
return bindCb(this._allocError)
}

let buffer
this.source = source
this.bindCb = bindCb

_read()
this.source.bindSink(this)

function next (status, error, bytes) {
if (status === 'end') return
if (error) bindCb(error)
this.source.pull(null, this._buffer)
}

process.stdout.write(buffer.slice(0, bytes))
next (status, error, buffer, bytes) {
if (status === 'end') return
if (error) bindCb(error)

_read()
}
process.stdout.write(buffer.slice(0, bytes))

function _read () {
try {
buffer = Buffer.allocUnsafe(64 * 1024)
} catch (error) {
return bindCb(error)
}
read(null, buffer, next)
}
this.source.pull(null, this._buffer)
}
}

module.exports = StdoutSink
12 changes: 10 additions & 2 deletions tests/file-to-console-test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
'use strict'

const FileSource = require('../fs/file-source')
const stdoutSink = require('../stdio/stdout-sink')
const StdoutSink = require('../stdio/stdout-sink')

const fileSource = new FileSource(process.argv[2])
stdoutSink(error => console.error('ERROR!', error))(fileSource.read.bind(fileSource))
const stdoutSink = new StdoutSink()

stdoutSink.bindSource(fileSource, error => {
if (error)
console.error('ERROR!', error)
else {
console.log('done')
}
})

0 comments on commit 1ffe4be

Please sign in to comment.