Skip to content
This repository has been archived by the owner on Dec 1, 2024. It is now read-only.

streams2 write stream #177

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 64 additions & 131 deletions lib/write-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
* <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
*/

var Stream = require('stream').Stream
var Writable = require('readable-stream').Writable
, inherits = require('util').inherits
, extend = require('xtend')
, concatStream = require('concat-stream')

, setImmediate = require('./util').setImmediate

Expand All @@ -16,162 +15,96 @@ var Stream = require('stream').Stream
, defaultOptions = { type: 'put' }

function WriteStream (options, db) {
Stream.call(this)
Writable.call(this, { objectMode: true })
this._options = extend(defaultOptions, getOptions(db, options))
this._db = db
this._buffer = []
this._status = 'init'
this._end = false
this._buffer = []
this.writable = true
this.readable = false

var self = this
var ready = function () {
if (!self.writable)
return
self._status = 'ready'
self.emit('ready')
self._process()
}

if (db.isOpen())
setImmediate(ready)
else
db.once('ready', ready)
}

inherits(WriteStream, Stream)

WriteStream.prototype.write = function (data) {
if (!this.writable)
return false
this._buffer.push(data)
if (this._status != 'init')
this._processDelayed()
if (this._options.maxBufferLength &&
this._buffer.length > this._options.maxBufferLength) {
this._writeBlock = true
return false
}
return true
}

WriteStream.prototype.end = function (data) {
var self = this
if (data)
this.write(data)
setImmediate(function () {
self._end = true
self._process()
this.on('finish', function f () {
if (self._buffer && self._buffer.length) {
return self._flush(f)
}
self.writable = false
self.emit('close')
})
}

WriteStream.prototype.destroy = function () {
this.writable = false
this.end()
}
inherits(WriteStream, Writable)

WriteStream.prototype.destroySoon = function () {
this.end()
}

WriteStream.prototype.add = function (entry) {
if (!entry.props)
return
if (entry.props.Directory)
entry.pipe(this._db.writeStream(this._options))
else if (entry.props.File || entry.File || entry.type == 'File')
this._write(entry)
return true
}

WriteStream.prototype._processDelayed = function () {
WriteStream.prototype._write = function write (d, enc, next) {
var self = this
setImmediate(function () {
self._process()
})
}

WriteStream.prototype._process = function () {
var buffer
, self = this

, cb = function (err) {
if (!self.writable)
return
if (self._status != 'closed')
self._status = 'ready'
if (err) {
self.writable = false
return self.emit('error', err)
}
self._process()
}

if (self._status != 'ready' && self.writable) {
if (self._buffer.length && self._status != 'closed')
self._processDelayed()
if (self._destroyed)
return
if (!self._db.isOpen())
return self._db.once('ready', function () {
write.call(self, d, enc, next)
})

if (self._options.maxBufferLength &&
self._buffer.length > self._options.maxBufferLength) {
self.once('_flush', next)
}

if (self._buffer.length && self.writable) {
self._status = 'writing'
buffer = self._buffer
self._buffer = []

self._db.batch(buffer.map(function (d) {
return {
type : d.type || self._options.type
, key : d.key
, value : d.value
, keyEncoding : d.keyEncoding || self._options.keyEncoding
, valueEncoding : d.valueEncoding
|| d.encoding
|| self._options.valueEncoding
}
}), cb)

if (self._writeBlock) {
self._writeBlock = false
self.emit('drain')
else {
if (self._buffer.length === 0) {
setImmediate(function () { self._flush() })
}

// don't allow close until callback has returned
return
}

if (self._end && self._status != 'closed') {
self._status = 'closed'
self.writable = false
self.emit('close')
self._buffer.push(d)
next()
}
}

WriteStream.prototype._write = function (entry) {
var key = entry.path || entry.props.path
, self = this

if (!key)
return
WriteStream.prototype._flush = function (f) {
var self = this
var buffer = self._buffer
if (self._destroyed || !buffer) return

if (!self._db.isOpen()) {
return self._db.on('ready', function () { self._flush(f) })
}
self._buffer = []

self._db.batch(buffer.map(function (d) {
return {
type : d.type || self._options.type
, key : d.key
, value : d.value
, keyEncoding : d.keyEncoding || self._options.keyEncoding
, valueEncoding : d.valueEncoding
|| d.encoding
|| self._options.valueEncoding
}
}), cb)

entry.pipe(concatStream(function (err, data) {
function cb (err) {
if (err) {
self.writable = false
return self.emit('error', err)
self.emit('error', err)
}

if (self._options.fstreamRoot &&
key.indexOf(self._options.fstreamRoot) > -1)
key = key.substr(self._options.fstreamRoot.length + 1)

self.write({ key: key, value: data })
}))
else {
if (f) f()
self.emit('_flush')
}
}
}

WriteStream.prototype.toString = function () {
return 'LevelUP.WriteStream'
}

WriteStream.prototype.destroy = function () {
if (this._destroyed) return
this._buffer = null
this._destroyed = true
this.writable = false
this.emit('close')
}
WriteStream.prototype.destroySoon = function () {
this.end()
}

module.exports.create = function (options, db) {
return new WriteStream(options, db)
}
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
, "main" : "lib/levelup.js"
, "dependencies" : {
"errno" : "~0.0.3"
, "concat-stream" : "~0.1.1"
, "simple-bufferstream" : "~0.0.2"
, "readable-stream" : "~1.0.15"
, "xtend" : "~2.0.3"
, "prr" : "~0.0.0"
, "semver" : "~1.1.4"
Expand Down
28 changes: 11 additions & 17 deletions test/write-stream-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ buster.testCase('WriteStream', {
this.sourceData.forEach(function (d) {
ws.write(d)
})
ws.once('ready', ws.end) // end after it's ready, nextTick makes this work OK
ws.end()
}.bind(this))
}

Expand Down Expand Up @@ -160,7 +160,7 @@ buster.testCase('WriteStream', {
this.sourceData.forEach(function (d) {
ws.write(d)
})
ws.once('ready', ws.destroySoon) // end after it's ready, nextTick makes this work OK
ws.destroySoon()
}.bind(this))
}

Expand Down Expand Up @@ -196,7 +196,7 @@ buster.testCase('WriteStream', {
})
assert.isTrue(ws.writable)
assert.isFalse(ws.readable)
ws.once('ready', ws.destroy)
ws.destroy()
}.bind(this))
}

Expand All @@ -223,7 +223,7 @@ buster.testCase('WriteStream', {
data.forEach(function (d) {
ws.write(d)
})
ws.once('ready', ws.end) // end after it's ready, nextTick makes this work OK
ws.end()
}.bind(this))
}

Expand Down Expand Up @@ -260,8 +260,7 @@ buster.testCase('WriteStream', {
ws.write(d)
})

// end after it's ready, nextTick makes this work OK
ws.once('ready', ws.end)
ws.end()
},
function (db, cb) {
var delStream = db.createWriteStream()
Expand All @@ -276,8 +275,7 @@ buster.testCase('WriteStream', {
delStream.write(d)
})

// end after it's ready, nextTick makes this work OK
delStream.once('ready', delStream.end)
delStream.end()
},
function (db, cb) {
async.forEach(
Expand Down Expand Up @@ -329,8 +327,7 @@ buster.testCase('WriteStream', {
ws.write(d)
})

// end after it's ready, nextTick makes this work OK
ws.once('ready', ws.end)
ws.end()
},
function (db, cb) {
var delStream = db.createWriteStream({ type: 'del' })
Expand All @@ -344,8 +341,7 @@ buster.testCase('WriteStream', {
delStream.write(d)
})

// end after it's ready, nextTick makes this work OK
delStream.once('ready', delStream.end)
delStream.end()
},
function (db, cb) {
async.forEach(
Expand Down Expand Up @@ -400,8 +396,7 @@ buster.testCase('WriteStream', {
ws.write(d)
})

// end after it's ready, nextTick makes this work OK
ws.once('ready', ws.end)
ws.end()
},
function (db, cb) {
var delStream = db.createWriteStream({ type: 'del' })
Expand All @@ -415,8 +410,7 @@ buster.testCase('WriteStream', {
delStream.write(d)
})

// end after it's ready, nextTick makes this work OK
delStream.once('ready', delStream.end)
delStream.end()
},
function (db, cb) {
async.forEach(
Expand Down Expand Up @@ -455,7 +449,7 @@ buster.testCase('WriteStream', {
d.type = "x" + Math.random()
ws.write(d)
})
ws.once('ready', ws.end) // end after it's ready, nextTick makes this work OK
ws.end()
}.bind(this),
function (db, cb) {
async.forEach(
Expand Down