diff --git a/test/multithreading-test.js b/test/multithreading-test.js index 2a5f1a3..8149c71 100644 --- a/test/multithreading-test.js +++ b/test/multithreading-test.js @@ -5,7 +5,18 @@ const tempy = require("tempy"); const path = require("path"); const { Worker } = require("worker_threads"); const { ClassicLevel } = require(".."); -const { CLOSED_DB_MESSAGE } = require("./worker-utils"); +const { + MIN_KEY, + MID_KEY, + MAX_KEY, + CLOSED_DB_MESSAGE, + WORKER_CREATING_KEYS_MESSAGE, + WORKER_READY_TO_READ_MESSAGE, + WORKER_ERROR_MESSAGE, + START_READING_MESSAGE, + createRandomKeys, + getRandomKeys, +} = require("./worker-utils"); /** * Makes sure that the allowMultiThreading flag is working as expected @@ -53,23 +64,19 @@ test("open/close mutex works as expected", async function (t) { for (let i = 0; i < 100; i++) { const worker = new Worker(path.join(__dirname, "worker-test.js"), { - workerData: { - location, - }, + workerData: { location, workerStartup: true }, }); activeWorkers.push( new Promise((resolve, reject) => { - worker.once("error", (err) => { - worker.removeAllListeners("message"); - reject(err); - }); - worker.once("message", (message) => { - if (message !== CLOSED_DB_MESSAGE) { - return reject("did not receive correct message"); + worker.once("message", ({ message, error }) => { + if (message === WORKER_ERROR_MESSAGE) { + return reject(error); + } + if (message === CLOSED_DB_MESSAGE) { + return resolve(); } - worker.removeAllListeners("error"); - resolve(); + return reject("unexpected error\n>>> " + error); }); }) ); @@ -81,3 +88,62 @@ test("open/close mutex works as expected", async function (t) { await db1.close(); }); + +test("allow multi-threading by same process", async function (t) { + try { + const location = tempy.directory(); + const db = new ClassicLevel(location); + + const worker = new Worker(path.join(__dirname, "worker-test.js"), { + workerData: { location, readWrite: true }, + }); + + function cleanup(err) { + worker.removeAllListeners("message"); + worker.removeAllListeners("error"); + worker.terminate(); + if (err) { + throw err; + } + } + + worker.on("error", cleanup); + worker.on("message", ({ message, error }) => { + if (message === WORKER_ERROR_MESSAGE) { + cleanup(new Error(error)); + } + }); + + // Concurrently write keys to the db on both thread and wait + // until ready before attempting to concurrently read keys + const workerReady = new Promise((resolve) => { + let mainThreadReady = false; + worker.on("message", ({ message }) => { + if (message === WORKER_CREATING_KEYS_MESSAGE) { + createRandomKeys(db, MID_KEY, MAX_KEY).then(() => { + mainThreadReady = true; + }); + } else if (message === WORKER_READY_TO_READ_MESSAGE) { + const interval = setInterval(() => { + if (mainThreadReady) { + clearInterval(interval); + resolve(); + } + }, 100); + } + }); + }); + + await workerReady; + + // once db is seeded start reading keys from both threads + worker.postMessage({ message: START_READING_MESSAGE }); + await getRandomKeys(db, MIN_KEY, MAX_KEY); + await db.close(); + + t.end(); + } catch (error) { + t.fail(error.message); + t.end(); + } +}); diff --git a/test/worker-test.js b/test/worker-test.js index 469da7d..67fe889 100644 --- a/test/worker-test.js +++ b/test/worker-test.js @@ -2,15 +2,88 @@ const { parentPort, workerData } = require("worker_threads"); const { ClassicLevel } = require(".."); -const { CLOSED_DB_MESSAGE, getRandomValue } = require("./worker-utils"); +const { + MIN_KEY, + MID_KEY, + MAX_KEY, + CLOSED_DB_MESSAGE, + WORKER_CREATING_KEYS_MESSAGE, + WORKER_READY_TO_READ_MESSAGE, + WORKER_ERROR_MESSAGE, + START_READING_MESSAGE, + getRandomValue, + createRandomKeys, + getRandomKeys, +} = require("./worker-utils"); (async function main() { const db = new ClassicLevel(workerData.location); await db.open({ allowMultiThreading: true }); - setTimeout(() => { - db.close().then(() => { - parentPort.postMessage(CLOSED_DB_MESSAGE); + try { + /** + * test "open/close mutex works as expected" + */ + if (workerData.workerStartup) { + setTimeout(() => { + db.close() + .catch((err) => { + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, + }); + }) + .then(() => { + parentPort.postMessage({ + message: CLOSED_DB_MESSAGE, + }); + }); + }, getRandomValue(1, 100)); + return; + } + + /** + * test "allow multi-threading by same process" + */ + if (workerData.readWrite) { + parentPort.once("message", ({ message }) => { + if (message !== START_READING_MESSAGE) { + return parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: `did not receive '${START_READING_MESSAGE}' message`, + }); + } + getRandomKeys(db, MIN_KEY, MAX_KEY) + .then(() => db.close()) + .catch((err) => + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, + }) + ); + }); + + parentPort.postMessage({ message: WORKER_CREATING_KEYS_MESSAGE }); + await createRandomKeys(db, MIN_KEY, MID_KEY).catch((err) => { + parentPort.removeAllListeners("message"); + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, + }); + }); + parentPort.postMessage({ message: WORKER_READY_TO_READ_MESSAGE }); + + return; + } + + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: "invalid workerData", + }); + } catch (err) { + parentPort.postMessage({ + message: WORKER_ERROR_MESSAGE, + error: err.message, }); - }, getRandomValue(1, 100)); + } })(); diff --git a/test/worker-utils.js b/test/worker-utils.js index 3bca598..486d1aa 100644 --- a/test/worker-utils.js +++ b/test/worker-utils.js @@ -1,6 +1,30 @@ +exports.TEST_INTERVAL_MS = 1000; + +exports.MIN_KEY = 1; +exports.MAX_KEY = 1000; +exports.MID_KEY = exports.MAX_KEY / 2; + exports.CLOSED_DB_MESSAGE = "closed db"; +exports.WORKER_CREATING_KEYS_MESSAGE = "worker creating keys"; +exports.WORKER_READY_TO_READ_MESSAGE = "worker ready to read keys"; +exports.WORKER_ERROR_MESSAGE = "worker error"; +exports.START_READING_MESSAGE = "start reading"; function getRandomValue(minValue, maxValue) { return Math.floor(Math.random() * (maxValue - minValue + 1) + minValue); } exports.getRandomValue = getRandomValue; + +exports.createRandomKeys = async (db, minKey, maxKey) => { + for (let i = minKey; i <= maxKey; i++) { + await db.put(`key${i}`, `value${i}`); + } +}; + +exports.getRandomKeys = async (db, minKey, maxKey) => { + const start = Date.now(); + while (Date.now() - start < exports.TEST_INTERVAL_MS) { + const randomKey = getRandomValue(minKey, maxKey); + await db.get(`key${randomKey}`); + } +};