Skip to content

Commit

Permalink
Use promise API for abstract-level 2 compatibility (#18)
Browse files Browse the repository at this point in the history
Both abstract-level 1 and  abstract-level 2 are now supported.
  • Loading branch information
vweevers authored Oct 21, 2024
1 parent 6767696 commit 912041a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 65 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node: [12, 14, 16]
# TODO: drop < 18 in next major
node: [12, 14, 16, 18, 20, 22]
name: Node ${{ matrix.node }}
steps:
- name: Checkout
Expand Down
61 changes: 55 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
const { Readable } = require('readable-stream')

const kIterator = Symbol('iterator')
const kPromises = Symbol('promises')
const kNextv = Symbol('nextv')
const kNextvLegacy = Symbol('nextvLegacy')
const kDestroy = Symbol('destroy')

class LevelReadStream extends Readable {
constructor (db, method, options) {
Expand All @@ -16,6 +19,13 @@ class LevelReadStream extends Readable {

this[kIterator] = db[method](rest)
this[kNextv] = this[kNextv].bind(this)
this[kNextvLegacy] = this[kNextvLegacy].bind(this)
this[kDestroy] = this.destroy.bind(this)

// Detect abstract-level 2 by the presence of hooks. Version 2 doesn't
// support callbacks anymore. Version 1 does also support promises but
// that would be slower because it works by wrapping the callback API.
this[kPromises] = db.hooks !== undefined

// NOTE: use autoDestroy option when it lands in readable-stream
this.once('end', this.destroy.bind(this, null, null))
Expand All @@ -27,10 +37,18 @@ class LevelReadStream extends Readable {

_read (size) {
if (this.destroyed) return
this[kIterator].nextv(size, this[kNextv])

if (this[kPromises]) {
this[kIterator].nextv(size).then(
this[kNextv],
this[kDestroy]
)
} else {
this[kIterator].nextv(size, this[kNextvLegacy])
}
}

[kNextv] (err, items) {
[kNextvLegacy] (err, items) {
if (this.destroyed) return
if (err) return this.destroy(err)

Expand All @@ -43,10 +61,29 @@ class LevelReadStream extends Readable {
}
}

[kNextv] (items) {
if (this.destroyed) return

if (items.length === 0) {
this.push(null)
} else {
for (const item of items) {
this.push(item)
}
}
}

_destroy (err, callback) {
this[kIterator].close(function (err2) {
callback(err || err2)
})
if (this[kPromises]) {
this[kIterator].close().then(
err ? () => callback(err) : callback,
callback
)
} else {
this[kIterator].close(function (err2) {
callback(err || err2)
})
}
}
}

Expand All @@ -55,7 +92,7 @@ class EntryStream extends LevelReadStream {
super(db, 'iterator', { ...options, keys: true, values: true })
}

[kNextv] (err, entries) {
[kNextvLegacy] (err, entries) {
if (this.destroyed) return
if (err) return this.destroy(err)

Expand All @@ -67,6 +104,18 @@ class EntryStream extends LevelReadStream {
}
}
}

[kNextv] (entries) {
if (this.destroyed) return

if (entries.length === 0) {
this.push(null)
} else {
for (const [key, value] of entries) {
this.push({ key, value })
}
}
}
}

class KeyStream extends LevelReadStream {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"readable-stream": "^3.4.0"
},
"peerDependencies": {
"abstract-level": "^1.0.0"
"abstract-level": ">=1.0.0"
},
"peerDependenciesMeta": {
"abstract-level": {
Expand All @@ -35,7 +35,7 @@
"airtap-playwright": "^1.0.1",
"faucet": "^0.0.3",
"hallmark": "^4.0.0",
"memory-level": "^1.0.0",
"memory-level": "^2.0.0",
"nyc": "^15.1.0",
"secret-event-listener": "^1.0.0",
"standard": "^16.0.3",
Expand Down
116 changes: 60 additions & 56 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ const { EntryStream, KeyStream, ValueStream } = require('.')
const { Writable, pipeline } = require('readable-stream')
const addSecretListener = require('secret-event-listener')

const delayedPipeline = async (...args) => {
await pipeline(...args)
await new Promise(setImmediate)
}

let db
const kLastIterator = Symbol('lastIterator')
const data = [
Expand All @@ -14,7 +19,7 @@ const data = [
{ key: 'c', value: '3' }
]

test('setup', function (t) {
test('setup', async function (t) {
db = new MemoryLevel()

// Keep track of last created iterator for test purposes
Expand All @@ -27,37 +32,33 @@ test('setup', function (t) {
}
}

db.open(function (err) {
t.error(err, 'no error')
db.batch(data.map(x => ({ type: 'put', ...x })), function (err) {
t.error(err, 'no error')
t.end()
})
})
await db.open()
await db.batch(data.map(x => ({ type: 'put', ...x })))
})

test('EntryStream', function (t) {
t.plan(2)
test('EntryStream', async function (t) {
t.plan(1)

pipeline(new EntryStream(db), new Concat((acc) => {
// TODO: pipeline returns before Concat calls the callback
await delayedPipeline(new EntryStream(db), new Concat((acc) => {
t.same(acc, data)
}), t.ifError.bind(t))
}))
})

test('KeyStream', function (t) {
t.plan(2)
test('KeyStream', async function (t) {
t.plan(1)

pipeline(new KeyStream(db), new Concat((acc) => {
await delayedPipeline(new KeyStream(db), new Concat((acc) => {
t.same(acc, data.map(x => x.key))
}), t.ifError.bind(t))
}))
})

test('ValueStream', function (t) {
t.plan(2)
test('ValueStream', async function (t) {
t.plan(1)

pipeline(new ValueStream(db), new Concat((acc) => {
await delayedPipeline(new ValueStream(db), new Concat((acc) => {
t.same(acc, data.map(x => x.value))
}), t.ifError.bind(t))
}))
})

for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
Expand All @@ -82,8 +83,8 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
t.end()
})

db[kLastIterator]._nextv = function (size, options, cb) {
process.nextTick(cb, new Error('nextv'))
db[kLastIterator]._nextv = async function (size, options) {
throw new Error('nextv')
}

stream.resume()
Expand Down Expand Up @@ -142,7 +143,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
t.end()
})

db[kLastIterator].nextv = function () {
db[kLastIterator].nextv = async function () {
stream.destroy()
}

Expand All @@ -156,7 +157,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
t.end()
})

db[kLastIterator].nextv = function (size, options, cb) {
db[kLastIterator].nextv = async function (size, options) {
stream.destroy(new Error('user'))
}

Expand All @@ -171,10 +172,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
t.end()
})

db[kLastIterator].nextv = function (size, options, cb) {
stream.destroy(new Error('user'), function (err) {
order.push('callback')
t.is(err.message, 'user', 'got error')
db[kLastIterator].nextv = async function (size, options) {
return new Promise((resolve) => {
stream.destroy(new Error('user'), function (err) {
order.push('callback')
t.is(err.message, 'user', 'got error')
resolve()
})
})
}

Expand All @@ -189,10 +193,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
t.end()
})

db[kLastIterator].nextv = function (size, options, cb) {
stream.destroy(null, function (err) {
order.push('callback')
t.ifError(err, 'no error')
db[kLastIterator].nextv = async function (size, options) {
return new Promise((resolve) => {
stream.destroy(null, function (err) {
order.push('callback')
t.ifError(err, 'no error')
resolve()
})
})
}

Expand All @@ -203,10 +210,11 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
const stream = new Ctor(db)
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
iterator.nextv = function (size, cb) {
iterator.nextv = async function (size) {
t.pass('should be called once')
nextv(size, cb)
const promise = nextv(size)
stream.destroy()
return promise
}
stream.on('data', function (data) {
t.fail('should not be called')
Expand All @@ -219,12 +227,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
let count = 0
iterator.nextv = function (size, cb) {
iterator.nextv = async function (size) {
t.pass('should be called')
nextv(size, cb)
const promise = nextv(size)
if (++count === 2) {
stream.destroy()
}
return promise
}
stream.on('data', function (data) {
t.pass('should be called')
Expand All @@ -236,12 +245,11 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
const stream = new Ctor(db)
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
iterator.nextv = function (size, cb) {
nextv(size, function (err, key, value) {
stream.destroy()
cb(err, key, value)
t.pass('should be called')
})
iterator.nextv = async function (size) {
const result = await nextv(size)
stream.destroy()
t.pass('should be called')
return result
}
stream.on('data', function (data) {
t.fail('should not be called')
Expand All @@ -254,14 +262,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
let count = 0
iterator.nextv = function (size, cb) {
nextv(size, function (err, key, value) {
if (++count === 2) {
stream.destroy()
}
cb(err, key, value)
t.pass('should be called')
})
iterator.nextv = async function (size) {
const result = await nextv(size)
if (++count === 2) {
stream.destroy()
}
t.pass('should be called')
return result
}
stream.on('data', function (data) {
t.pass('should be called')
Expand Down Expand Up @@ -299,10 +306,7 @@ test('it is safe to close db on end of stream', function (t) {
// Although the underlying iterator is still alive at this point (before
// the 'close' event has been emitted) it's safe to close the db because
// leveldown (v5) ends any open iterators before closing.
db.close(function (err) {
t.ifError(err, 'no error')
t.end()
})
db.close().then(t.end.bind(t), t.fail.bind(t))
})

stream.resume()
Expand All @@ -314,9 +318,9 @@ function monitor (stream, onClose) {
;['_next', '_nextv', '_close'].forEach(function (method) {
const original = db[kLastIterator][method]

db[kLastIterator][method] = function () {
db[kLastIterator][method] = async function () {
order.push(method)
original.apply(this, arguments)
return original.apply(this, arguments)
}
})

Expand Down

0 comments on commit 912041a

Please sign in to comment.