Skip to content

Commit

Permalink
test: Add tests for ThreadSafeFunction's NonBlock function overloads (#…
Browse files Browse the repository at this point in the history
…1249)

* test: Add test coverage for Nonblock overloads for threadsafefunction
  • Loading branch information
JckXia authored Jan 24, 2023
1 parent fdc6263 commit 78b5a15
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 20 deletions.
39 changes: 36 additions & 3 deletions test/threadsafe_function/threadsafe_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ static std::thread threads[2];
static ThreadSafeFunction s_tsfn;

struct ThreadSafeFunctionInfo {
enum CallType { DEFAULT, BLOCKING, NON_BLOCKING } type;
enum CallType {
DEFAULT,
BLOCKING,
NON_BLOCKING,
NON_BLOCKING_DEFAULT,
NON_BLOCKING_SINGLE_ARG
} type;
bool abort;
bool startSecondary;
FunctionReference jsFinalizeCallback;
Expand All @@ -42,18 +48,23 @@ static void DataSourceThread() {
if (s_tsfn.Acquire() != napi_ok) {
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed");
}

threads[1] = std::thread(SecondaryThread);
}

bool queueWasFull = false;
bool queueWasClosing = false;

for (int index = ARRAY_LENGTH - 1; index > -1 && !queueWasClosing; index--) {
napi_status status = napi_generic_failure;

auto callback = [](Env env, Function jsCallback, int* data) {
jsCallback.Call({Number::New(env, *data)});
};

auto noArgCallback = [](Env env, Function jsCallback) {
jsCallback.Call({Number::New(env, 42)});
};

switch (info->type) {
case ThreadSafeFunctionInfo::DEFAULT:
status = s_tsfn.BlockingCall();
Expand All @@ -64,9 +75,17 @@ static void DataSourceThread() {
case ThreadSafeFunctionInfo::NON_BLOCKING:
status = s_tsfn.NonBlockingCall(&ints[index], callback);
break;
case ThreadSafeFunctionInfo::NON_BLOCKING_DEFAULT:
status = s_tsfn.NonBlockingCall();
break;

case ThreadSafeFunctionInfo::NON_BLOCKING_SINGLE_ARG:
status = s_tsfn.NonBlockingCall(noArgCallback);
break;
}

if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) {
if (info->abort && (info->type == ThreadSafeFunctionInfo::BLOCKING ||
info->type == ThreadSafeFunctionInfo::DEFAULT)) {
// Let's make this thread really busy to give the main thread a chance to
// abort / close.
std::unique_lock<std::mutex> lk(info->protect);
Expand Down Expand Up @@ -176,6 +195,16 @@ static Value StartThreadNoNative(const CallbackInfo& info) {
return StartThreadInternal(info, ThreadSafeFunctionInfo::DEFAULT);
}

static Value StartThreadNonblockingNoNative(const CallbackInfo& info) {
return StartThreadInternal(info,
ThreadSafeFunctionInfo::NON_BLOCKING_DEFAULT);
}

static Value StartThreadNonBlockingSingleArg(const CallbackInfo& info) {
return StartThreadInternal(info,
ThreadSafeFunctionInfo::NON_BLOCKING_SINGLE_ARG);
}

Object InitThreadSafeFunction(Env env) {
for (size_t index = 0; index < ARRAY_LENGTH; index++) {
ints[index] = index;
Expand All @@ -186,8 +215,12 @@ Object InitThreadSafeFunction(Env env) {
exports["MAX_QUEUE_SIZE"] = Number::New(env, MAX_QUEUE_SIZE);
exports["startThread"] = Function::New(env, StartThread);
exports["startThreadNoNative"] = Function::New(env, StartThreadNoNative);
exports["startThreadNonblockingNoNative"] =
Function::New(env, StartThreadNonblockingNoNative);
exports["startThreadNonblocking"] =
Function::New(env, StartThreadNonblocking);
exports["startThreadNonblockSingleArg"] =
Function::New(env, StartThreadNonBlockingSingleArg);
exports["stopThread"] = Function::New(env, StopThread);
exports["release"] = Function::New(env, Release);

Expand Down
71 changes: 54 additions & 17 deletions test/threadsafe_function/threadsafe_function.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const common = require('../common');

module.exports = common.runTest(test);

// Main test body
async function test (binding) {
const expectedArray = (function (arrayLength) {
const result = [];
Expand All @@ -14,6 +15,8 @@ async function test (binding) {
return result;
})(binding.threadsafe_function.ARRAY_LENGTH);

const expectedDefaultArray = Array.from({ length: binding.threadsafe_function.ARRAY_LENGTH }, (_, i) => 42);

function testWithJSMarshaller ({
threadStarter,
quitAfter,
Expand All @@ -31,7 +34,7 @@ async function test (binding) {
}), !!abort);
}
}, !!abort, !!launchSecondary, maxQueueSize);
if (threadStarter === 'startThreadNonblocking') {
if ((threadStarter === 'startThreadNonblocking' || threadStarter === 'startThreadNonblockSingleArg')) {
// Let's make this thread really busy for a short while to ensure that
// the queue fills and the thread receives a napi_queue_full.
const start = Date.now();
Expand All @@ -40,23 +43,28 @@ async function test (binding) {
});
}

await new Promise(function testWithoutJSMarshaller (resolve) {
let callCount = 0;
binding.threadsafe_function.startThreadNoNative(function testCallback () {
callCount++;
function testWithoutJSMarshallers (nativeFunction) {
return new Promise((resolve) => {
let callCount = 0;
nativeFunction(function testCallback () {
callCount++;

// The default call-into-JS implementation passes no arguments.
assert.strictEqual(arguments.length, 0);
if (callCount === binding.threadsafe_function.ARRAY_LENGTH) {
setImmediate(() => {
binding.threadsafe_function.stopThread(common.mustCall(() => {
resolve();
}), false);
});
}
}, false /* abort */, false /* launchSecondary */,
binding.threadsafe_function.MAX_QUEUE_SIZE);
});
}

// The default call-into-JS implementation passes no arguments.
assert.strictEqual(arguments.length, 0);
if (callCount === binding.threadsafe_function.ARRAY_LENGTH) {
setImmediate(() => {
binding.threadsafe_function.stopThread(common.mustCall(() => {
resolve();
}), false);
});
}
}, false /* abort */, false /* launchSecondary */,
binding.threadsafe_function.MAX_QUEUE_SIZE);
});
await testWithoutJSMarshallers(binding.threadsafe_function.startThreadNoNative);
await testWithoutJSMarshallers(binding.threadsafe_function.startThreadNonblockingNoNative);

// Start the thread in blocking mode, and assert that all values are passed.
// Quit after it's done.
Expand Down Expand Up @@ -124,6 +132,15 @@ async function test (binding) {
expectedArray
);

assert.deepStrictEqual(
await testWithJSMarshaller({
threadStarter: 'startThreadNonblockSingleArg',
maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE,
quitAfter: 1
}),
expectedDefaultArray
);

// Start the thread in blocking mode, and assert that all values are passed.
// Quit early, but let the thread finish. Launch a secondary thread to test
// the reference counter incrementing functionality.
Expand All @@ -150,6 +167,16 @@ async function test (binding) {
expectedArray
);

assert.deepStrictEqual(
await testWithJSMarshaller({
threadStarter: 'startThreadNonblockSingleArg',
maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE,
quitAfter: 1,
launchSecondary: true
}),
expectedDefaultArray
);

// Start the thread in blocking mode, and assert that it could not finish.
// Quit early by aborting.
assert.strictEqual(
Expand Down Expand Up @@ -185,4 +212,14 @@ async function test (binding) {
})).indexOf(0),
-1
);

assert.strictEqual(
(await testWithJSMarshaller({
threadStarter: 'startThreadNonblockSingleArg',
quitAfter: 1,
maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE,
abort: true
})).indexOf(0),
-1
);
}

0 comments on commit 78b5a15

Please sign in to comment.