Skip to content

Commit

Permalink
test: clean up unit tests for worker
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewkeil committed Nov 3, 2023
1 parent 23a1f20 commit 96490ac
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 54 deletions.
97 changes: 68 additions & 29 deletions test/multithreading-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,79 @@ const tempy = require("tempy");
const path = require("path");
const { Worker } = require("worker_threads");
const { ClassicLevel } = require("..");
const { createRandomKeys, getRandomKeys } = require("./worker-utils");
const { CLOSED_DB_MESSAGE } = require("./worker-utils");

test("allow multi-threading by same process", async function (t) {
/**
* Makes sure that the allowMultiThreading flag is working as expected
*/
test("check allowMultiThreading flag works as expected", async function (t) {
t.plan(5);
const location = tempy.directory();
const db1 = new ClassicLevel(location);
await db1.open({ location });
t.is(db1.location, location);

const db2 = new ClassicLevel(location);
await db2.open({ location, allowMultiThreading: true });
t.is(db2.location, location);

const db3 = new ClassicLevel(location);
try {
await db3.open({ location, allowMultiThreading: false });
} catch (err) {
t.is(err.code, "LEVEL_DATABASE_NOT_OPEN", "third instance failed to open");
t.is(err.cause.code, "LEVEL_LOCKED", "third instance got lock error");
}

await db1.close();
await db2.close();

const db4 = new ClassicLevel(location);
await db4.open({ location, allowMultiThreading: false });
t.is(db4.location, location);
await db4.close();
});

/**
* Tests for interleaved opening and closing of the database to check
* that the mutex for guarding the handles is working as expected
*/
test("open/close mutex works as expected", async function (t) {
t.plan(2);
const location = tempy.directory();
const db = new ClassicLevel(location);
await db.open();
await createRandomKeys(db);

const worker = new Worker(path.join(__dirname, "worker-test.js"), {
workerData: { location },
});

function onMessage(_) {
getRandomKeys(db, "main").catch((err) => {
worker.removeListener("error", onError);
onError(err);
const db1 = new ClassicLevel(location);
await db1.open({ location });
t.is(db1.location, location);

const activeWorkers = [];

for (let i = 0; i < 100; i++) {
const worker = new Worker(path.join(__dirname, "worker-test.js"), {
workerData: {
location,
},
});
}
worker.on("message", onMessage);

function onError(err) {
worker.removeListener("message", onMessage);
worker.removeListener("exit", onExit);
t.ifError(err, "worker error");
db.close(t.ifError.bind(t));
activeWorkers.push(
new Promise((resolve, reject) => {
worker.once("error", (err) => {
worker.removeAllListeners("message");
reject(err);
});
worker.once("message", (message) => {
if (message !== CLOSED_DB_MESSAGE) {
return reject("did not receive correct message");
}
worker.removeAllListeners("error");
resolve();
});
})
);
}
worker.once("error", onError);

function onExit(code) {
worker.removeListener("message", onMessage);
worker.removeListener("error", onError);
t.equal(code, 0, 'child exited normally');
db.close(t.ifError.bind(t));
}
worker.once("exit", onExit);
const results = await Promise.allSettled(activeWorkers);
const rejected = results.filter((res) => res.status === "rejected");
t.is(rejected.length, 0);

await db1.close();
});
11 changes: 6 additions & 5 deletions test/worker-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

const { parentPort, workerData } = require("worker_threads");
const { ClassicLevel } = require("..");
const { getRandomKeys } = require("./worker-utils");
const { CLOSED_DB_MESSAGE, getRandomValue } = require("./worker-utils");

(async function main() {
const db = new ClassicLevel(workerData.location);
await db.open({ allowMultiThreading: true });

parentPort.postMessage("starting");

await getRandomKeys(db, "worker");
await db.close();
setTimeout(() => {
db.close().then(() => {
parentPort.postMessage(CLOSED_DB_MESSAGE);
});
}, getRandomValue(1, 100));
})();
25 changes: 5 additions & 20 deletions test/worker-utils.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
const MIN_KEY = 1;
const MAX_KEY = 100;
const TEST_INTERVAL_MS = 1000;
exports.CLOSED_DB_MESSAGE = "closed db";

exports.createRandomKeys = async (db) => {
for (let i = MIN_KEY; i <= MAX_KEY; i++) {
await db.put(`key${i}`, `value${i}`);
}
};

exports.getRandomKeys = async (db, thread) => {
const start = Date.now();
while (Date.now() - start < TEST_INTERVAL_MS) {
const randomKey = Math.floor(
Math.random() * (MAX_KEY - MIN_KEY + 1) + MIN_KEY
);
// console.log(thread + ": got " +
await db.get(`key${randomKey}`);
// );
}
};
function getRandomValue(minValue, maxValue) {
return Math.floor(Math.random() * (maxValue - minValue + 1) + minValue);
}
exports.getRandomValue = getRandomValue;

0 comments on commit 96490ac

Please sign in to comment.