diff --git a/binding.cc b/binding.cc index 2ba2e2c6..84f754ee 100644 --- a/binding.cc +++ b/binding.cc @@ -689,6 +689,8 @@ struct Iterator { bool IteratorNext (std::vector >& result) { size_t size = 0; + uint32_t cacheSize = 0; + while (true) { std::string key, value; bool ok = Read(key, value); @@ -704,6 +706,9 @@ struct Iterator { size = size + key.size() + value.size(); if (size > highWaterMark_) return true; + // Limit the size of the cache to prevent starving the event loop + // in JS-land while we're recursively calling process.nextTick(). + if (++cacheSize >= 1000) return true; } else { return false; } diff --git a/iterator.js b/iterator.js index 72e47e74..2115eef2 100644 --- a/iterator.js +++ b/iterator.js @@ -1,6 +1,5 @@ const util = require('util') const AbstractIterator = require('abstract-leveldown').AbstractIterator -const fastFuture = require('fast-future') const binding = require('./binding') function Iterator (db, options) { @@ -9,7 +8,6 @@ function Iterator (db, options) { this.context = binding.iterator_init(db.context, options) this.cache = null this.finished = false - this.fastFuture = fastFuture() } util.inherits(Iterator, AbstractIterator) @@ -26,20 +24,11 @@ Iterator.prototype._seek = function (target) { Iterator.prototype._next = function (callback) { var that = this - var key - var value if (this.cache && this.cache.length) { - key = this.cache.pop() - value = this.cache.pop() - - this.fastFuture(function () { - callback(null, key, value) - }) + process.nextTick(callback, null, this.cache.pop(), this.cache.pop()) } else if (this.finished) { - this.fastFuture(function () { - callback() - }) + process.nextTick(callback) } else { binding.iterator_next(this.context, function (err, array, finished) { if (err) return callback(err) diff --git a/package.json b/package.json index 22c87e97..b194faad 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,6 @@ }, "dependencies": { "abstract-leveldown": "~6.0.3", - "fast-future": "~1.0.2", "napi-macros": "~1.8.1", "node-gyp-build": "~4.1.0" }, diff --git a/test/iterator-starvation-test.js b/test/iterator-starvation-test.js new file mode 100644 index 00000000..08b85718 --- /dev/null +++ b/test/iterator-starvation-test.js @@ -0,0 +1,124 @@ +'use strict' + +const test = require('tape') +const testCommon = require('./common') +const sourceData = [] + +// For this test the number of records in the db must be a multiple of +// the hardcoded fast-future limit (1000) or a cache size limit in C++. +for (let i = 0; i < 1e4; i++) { + sourceData.push({ + type: 'put', + key: i.toString(), + value: '' + }) +} + +test('setUp', testCommon.setUp) + +test('iterator does not starve event loop', function (t) { + t.plan(6) + + const db = testCommon.factory() + + db.open(function (err) { + t.ifError(err, 'no open error') + + // Insert test data + db.batch(sourceData.slice(), function (err) { + t.ifError(err, 'no batch error') + + // Set a high highWaterMark to fill up the cache entirely + const it = db.iterator({ highWaterMark: Math.pow(1024, 3) }) + + let breaths = 0 + let entries = 0 + let scheduled = false + + // Iterate continuously while also scheduling work with setImmediate(), + // which should be given a chance to run because we limit the tick depth. + const next = function () { + it.next(function (err, key, value) { + if (err || (key === undefined && value === undefined)) { + t.ifError(err, 'no next error') + t.is(entries, sourceData.length, 'got all data') + t.is(breaths, sourceData.length / 1000, 'breathed while iterating') + + return db.close(function (err) { + t.ifError(err, 'no close error') + }) + } + + entries++ + + if (!scheduled) { + scheduled = true + setImmediate(function () { + breaths++ + scheduled = false + }) + } + + next() + }) + } + + next() + }) + }) +}) + +test('iterator with seeks does not starve event loop', function (t) { + t.plan(6) + + const db = testCommon.factory() + + db.open(function (err) { + t.ifError(err, 'no open error') + + db.batch(sourceData.slice(), function (err) { + t.ifError(err, 'no batch error') + + const it = db.iterator({ highWaterMark: Math.pow(1024, 3), limit: sourceData.length }) + + let breaths = 0 + let entries = 0 + let scheduled = false + + const next = function () { + it.next(function (err, key, value) { + if (err || (key === undefined && value === undefined)) { + t.ifError(err, 'no next error') + t.is(entries, sourceData.length, 'got all data') + t.is(breaths, sourceData.length, 'breathed while iterating') + + return db.close(function (err) { + t.ifError(err, 'no close error') + }) + } + + entries++ + + if (!scheduled) { + // Seeking clears the cache, which should only have a positive + // effect because it means the cache must be refilled, which + // again gives us time to breathe. This is a smoke test, really. + it.seek(sourceData[0].key) + + scheduled = true + setImmediate(function () { + breaths++ + scheduled = false + }) + } + + next() + }) + } + + next() + }) + }) +}) + +test('tearDown', testCommon.tearDown)