Skip to content

Commit

Permalink
test: add read/write multithreading case
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewkeil committed Nov 5, 2023
1 parent 96490ac commit 6d46790
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 18 deletions.
92 changes: 79 additions & 13 deletions test/multithreading-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,18 @@ const tempy = require("tempy");
const path = require("path");
const { Worker } = require("worker_threads");
const { ClassicLevel } = require("..");
const { CLOSED_DB_MESSAGE } = require("./worker-utils");
const {
MIN_KEY,
MID_KEY,
MAX_KEY,
CLOSED_DB_MESSAGE,
WORKER_CREATING_KEYS_MESSAGE,
WORKER_READY_TO_READ_MESSAGE,
WORKER_ERROR_MESSAGE,
START_READING_MESSAGE,
createRandomKeys,
getRandomKeys,
} = require("./worker-utils");

/**
* Makes sure that the allowMultiThreading flag is working as expected
Expand Down Expand Up @@ -53,23 +64,19 @@ test("open/close mutex works as expected", async function (t) {

for (let i = 0; i < 100; i++) {
const worker = new Worker(path.join(__dirname, "worker-test.js"), {
workerData: {
location,
},
workerData: { location, workerStartup: true },
});

activeWorkers.push(
new Promise((resolve, reject) => {
worker.once("error", (err) => {
worker.removeAllListeners("message");
reject(err);
});
worker.once("message", (message) => {
if (message !== CLOSED_DB_MESSAGE) {
return reject("did not receive correct message");
worker.once("message", ({ message, error }) => {
if (message === WORKER_ERROR_MESSAGE) {
return reject(error);
}
if (message === CLOSED_DB_MESSAGE) {
return resolve();
}
worker.removeAllListeners("error");
resolve();
return reject("unexpected error\n>>> " + error);
});
})
);
Expand All @@ -81,3 +88,62 @@ test("open/close mutex works as expected", async function (t) {

await db1.close();
});

test("allow multi-threading by same process", async function (t) {
try {
const location = tempy.directory();
const db = new ClassicLevel(location);

const worker = new Worker(path.join(__dirname, "worker-test.js"), {
workerData: { location, readWrite: true },
});

function cleanup(err) {
worker.removeAllListeners("message");
worker.removeAllListeners("error");
worker.terminate();
if (err) {
throw err;
}
}

worker.on("error", cleanup);
worker.on("message", ({ message, error }) => {
if (message === WORKER_ERROR_MESSAGE) {
cleanup(new Error(error));
}
});

// Concurrently write keys to the db on both thread and wait
// until ready before attempting to concurrently read keys
const workerReady = new Promise((resolve) => {
let mainThreadReady = false;
worker.on("message", ({ message }) => {
if (message === WORKER_CREATING_KEYS_MESSAGE) {
createRandomKeys(db, MID_KEY, MAX_KEY).then(() => {
mainThreadReady = true;
});
} else if (message === WORKER_READY_TO_READ_MESSAGE) {
const interval = setInterval(() => {
if (mainThreadReady) {
clearInterval(interval);
resolve();
}
}, 100);
}
});
});

await workerReady;

// once db is seeded start reading keys from both threads
worker.postMessage({ message: START_READING_MESSAGE });
await getRandomKeys(db, MIN_KEY, MAX_KEY);
await db.close();

t.end();
} catch (error) {
t.fail(error.message);
t.end();
}
});
83 changes: 78 additions & 5 deletions test/worker-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,88 @@

const { parentPort, workerData } = require("worker_threads");
const { ClassicLevel } = require("..");
const { CLOSED_DB_MESSAGE, getRandomValue } = require("./worker-utils");
const {
MIN_KEY,
MID_KEY,
MAX_KEY,
CLOSED_DB_MESSAGE,
WORKER_CREATING_KEYS_MESSAGE,
WORKER_READY_TO_READ_MESSAGE,
WORKER_ERROR_MESSAGE,
START_READING_MESSAGE,
getRandomValue,
createRandomKeys,
getRandomKeys,
} = require("./worker-utils");

(async function main() {
const db = new ClassicLevel(workerData.location);
await db.open({ allowMultiThreading: true });

setTimeout(() => {
db.close().then(() => {
parentPort.postMessage(CLOSED_DB_MESSAGE);
try {
/**
* test "open/close mutex works as expected"
*/
if (workerData.workerStartup) {
setTimeout(() => {
db.close()
.catch((err) => {
parentPort.postMessage({
message: WORKER_ERROR_MESSAGE,
error: err.message,
});
})
.then(() => {
parentPort.postMessage({
message: CLOSED_DB_MESSAGE,
});
});
}, getRandomValue(1, 100));
return;
}

/**
* test "allow multi-threading by same process"
*/
if (workerData.readWrite) {
parentPort.once("message", ({ message }) => {
if (message !== START_READING_MESSAGE) {
return parentPort.postMessage({
message: WORKER_ERROR_MESSAGE,
error: `did not receive '${START_READING_MESSAGE}' message`,
});
}
getRandomKeys(db, MIN_KEY, MAX_KEY)
.then(() => db.close())
.catch((err) =>
parentPort.postMessage({
message: WORKER_ERROR_MESSAGE,
error: err.message,
})
);
});

parentPort.postMessage({ message: WORKER_CREATING_KEYS_MESSAGE });
await createRandomKeys(db, MIN_KEY, MID_KEY).catch((err) => {
parentPort.removeAllListeners("message");
parentPort.postMessage({
message: WORKER_ERROR_MESSAGE,
error: err.message,
});
});
parentPort.postMessage({ message: WORKER_READY_TO_READ_MESSAGE });

return;
}

parentPort.postMessage({
message: WORKER_ERROR_MESSAGE,
error: "invalid workerData",
});
} catch (err) {
parentPort.postMessage({
message: WORKER_ERROR_MESSAGE,
error: err.message,
});
}, getRandomValue(1, 100));
}
})();
24 changes: 24 additions & 0 deletions test/worker-utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
exports.TEST_INTERVAL_MS = 1000;

exports.MIN_KEY = 1;
exports.MAX_KEY = 1000;
exports.MID_KEY = exports.MAX_KEY / 2;

exports.CLOSED_DB_MESSAGE = "closed db";
exports.WORKER_CREATING_KEYS_MESSAGE = "worker creating keys";
exports.WORKER_READY_TO_READ_MESSAGE = "worker ready to read keys";
exports.WORKER_ERROR_MESSAGE = "worker error";
exports.START_READING_MESSAGE = "start reading";

function getRandomValue(minValue, maxValue) {
return Math.floor(Math.random() * (maxValue - minValue + 1) + minValue);
}
exports.getRandomValue = getRandomValue;

exports.createRandomKeys = async (db, minKey, maxKey) => {
for (let i = minKey; i <= maxKey; i++) {
await db.put(`key${i}`, `value${i}`);
}
};

exports.getRandomKeys = async (db, minKey, maxKey) => {
const start = Date.now();
while (Date.now() - start < exports.TEST_INTERVAL_MS) {
const randomKey = getRandomValue(minKey, maxKey);
await db.get(`key${randomKey}`);
}
};

0 comments on commit 6d46790

Please sign in to comment.