diff --git a/test/threadsafe_function/threadsafe_function.cc b/test/threadsafe_function/threadsafe_function.cc index 701f71fe1..8902b73c5 100644 --- a/test/threadsafe_function/threadsafe_function.cc +++ b/test/threadsafe_function/threadsafe_function.cc @@ -15,7 +15,13 @@ static std::thread threads[2]; static ThreadSafeFunction s_tsfn; struct ThreadSafeFunctionInfo { - enum CallType { DEFAULT, BLOCKING, NON_BLOCKING } type; + enum CallType { + DEFAULT, + BLOCKING, + NON_BLOCKING, + NON_BLOCKING_DEFAULT, + NON_BLOCKING_SINGLE_ARG + } type; bool abort; bool startSecondary; FunctionReference jsFinalizeCallback; @@ -42,18 +48,23 @@ static void DataSourceThread() { if (s_tsfn.Acquire() != napi_ok) { Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed"); } - threads[1] = std::thread(SecondaryThread); } bool queueWasFull = false; bool queueWasClosing = false; + for (int index = ARRAY_LENGTH - 1; index > -1 && !queueWasClosing; index--) { napi_status status = napi_generic_failure; + auto callback = [](Env env, Function jsCallback, int* data) { jsCallback.Call({Number::New(env, *data)}); }; + auto noArgCallback = [](Env env, Function jsCallback) { + jsCallback.Call({Number::New(env, 42)}); + }; + switch (info->type) { case ThreadSafeFunctionInfo::DEFAULT: status = s_tsfn.BlockingCall(); @@ -64,9 +75,17 @@ static void DataSourceThread() { case ThreadSafeFunctionInfo::NON_BLOCKING: status = s_tsfn.NonBlockingCall(&ints[index], callback); break; + case ThreadSafeFunctionInfo::NON_BLOCKING_DEFAULT: + status = s_tsfn.NonBlockingCall(); + break; + + case ThreadSafeFunctionInfo::NON_BLOCKING_SINGLE_ARG: + status = s_tsfn.NonBlockingCall(noArgCallback); + break; } - if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) { + if (info->abort && (info->type == ThreadSafeFunctionInfo::BLOCKING || + info->type == ThreadSafeFunctionInfo::DEFAULT)) { // Let's make this thread really busy to give the main thread a chance to // abort / close. std::unique_lock lk(info->protect); @@ -176,6 +195,16 @@ static Value StartThreadNoNative(const CallbackInfo& info) { return StartThreadInternal(info, ThreadSafeFunctionInfo::DEFAULT); } +static Value StartThreadNonblockingNoNative(const CallbackInfo& info) { + return StartThreadInternal(info, + ThreadSafeFunctionInfo::NON_BLOCKING_DEFAULT); +} + +static Value StartThreadNonBlockingSingleArg(const CallbackInfo& info) { + return StartThreadInternal(info, + ThreadSafeFunctionInfo::NON_BLOCKING_SINGLE_ARG); +} + Object InitThreadSafeFunction(Env env) { for (size_t index = 0; index < ARRAY_LENGTH; index++) { ints[index] = index; @@ -186,8 +215,12 @@ Object InitThreadSafeFunction(Env env) { exports["MAX_QUEUE_SIZE"] = Number::New(env, MAX_QUEUE_SIZE); exports["startThread"] = Function::New(env, StartThread); exports["startThreadNoNative"] = Function::New(env, StartThreadNoNative); + exports["startThreadNonblockingNoNative"] = + Function::New(env, StartThreadNonblockingNoNative); exports["startThreadNonblocking"] = Function::New(env, StartThreadNonblocking); + exports["startThreadNonblockSingleArg"] = + Function::New(env, StartThreadNonBlockingSingleArg); exports["stopThread"] = Function::New(env, StopThread); exports["release"] = Function::New(env, Release); diff --git a/test/threadsafe_function/threadsafe_function.js b/test/threadsafe_function/threadsafe_function.js index 573c33649..b29dfadb1 100644 --- a/test/threadsafe_function/threadsafe_function.js +++ b/test/threadsafe_function/threadsafe_function.js @@ -5,6 +5,7 @@ const common = require('../common'); module.exports = common.runTest(test); +// Main test body async function test (binding) { const expectedArray = (function (arrayLength) { const result = []; @@ -14,6 +15,8 @@ async function test (binding) { return result; })(binding.threadsafe_function.ARRAY_LENGTH); + const expectedDefaultArray = Array.from({ length: binding.threadsafe_function.ARRAY_LENGTH }, (_, i) => 42); + function testWithJSMarshaller ({ threadStarter, quitAfter, @@ -31,7 +34,7 @@ async function test (binding) { }), !!abort); } }, !!abort, !!launchSecondary, maxQueueSize); - if (threadStarter === 'startThreadNonblocking') { + if ((threadStarter === 'startThreadNonblocking' || threadStarter === 'startThreadNonblockSingleArg')) { // Let's make this thread really busy for a short while to ensure that // the queue fills and the thread receives a napi_queue_full. const start = Date.now(); @@ -40,23 +43,28 @@ async function test (binding) { }); } - await new Promise(function testWithoutJSMarshaller (resolve) { - let callCount = 0; - binding.threadsafe_function.startThreadNoNative(function testCallback () { - callCount++; + function testWithoutJSMarshallers (nativeFunction) { + return new Promise((resolve) => { + let callCount = 0; + nativeFunction(function testCallback () { + callCount++; + + // The default call-into-JS implementation passes no arguments. + assert.strictEqual(arguments.length, 0); + if (callCount === binding.threadsafe_function.ARRAY_LENGTH) { + setImmediate(() => { + binding.threadsafe_function.stopThread(common.mustCall(() => { + resolve(); + }), false); + }); + } + }, false /* abort */, false /* launchSecondary */, + binding.threadsafe_function.MAX_QUEUE_SIZE); + }); + } - // The default call-into-JS implementation passes no arguments. - assert.strictEqual(arguments.length, 0); - if (callCount === binding.threadsafe_function.ARRAY_LENGTH) { - setImmediate(() => { - binding.threadsafe_function.stopThread(common.mustCall(() => { - resolve(); - }), false); - }); - } - }, false /* abort */, false /* launchSecondary */, - binding.threadsafe_function.MAX_QUEUE_SIZE); - }); + await testWithoutJSMarshallers(binding.threadsafe_function.startThreadNoNative); + await testWithoutJSMarshallers(binding.threadsafe_function.startThreadNonblockingNoNative); // Start the thread in blocking mode, and assert that all values are passed. // Quit after it's done. @@ -124,6 +132,15 @@ async function test (binding) { expectedArray ); + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThreadNonblockSingleArg', + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: 1 + }), + expectedDefaultArray + ); + // Start the thread in blocking mode, and assert that all values are passed. // Quit early, but let the thread finish. Launch a secondary thread to test // the reference counter incrementing functionality. @@ -150,6 +167,16 @@ async function test (binding) { expectedArray ); + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThreadNonblockSingleArg', + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: 1, + launchSecondary: true + }), + expectedDefaultArray + ); + // Start the thread in blocking mode, and assert that it could not finish. // Quit early by aborting. assert.strictEqual( @@ -185,4 +212,14 @@ async function test (binding) { })).indexOf(0), -1 ); + + assert.strictEqual( + (await testWithJSMarshaller({ + threadStarter: 'startThreadNonblockSingleArg', + quitAfter: 1, + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + abort: true + })).indexOf(0), + -1 + ); }