Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Fix intermittent TSFN crashes #974

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Object Init(Env env, Object exports) {
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env));
exports.Set("threadsafe_function", InitTypedThreadSafeFunction(env));
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
exports.Set("typed_threadsafe_function_ctx",
InitTypedThreadSafeFunctionCtx(env));
exports.Set("typed_threadsafe_function_existing_tsfn",
Expand Down
24 changes: 18 additions & 6 deletions test/threadsafe_function/threadsafe_function.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include "napi.h"

Expand All @@ -22,6 +24,9 @@ struct ThreadSafeFunctionInfo {
bool startSecondary;
FunctionReference jsFinalizeCallback;
uint32_t maxQueueSize;
bool closeCalledFromJs;
std::mutex protect;
std::condition_variable signal;
} tsfnInfo;

// Thread data to transmit to JS
Expand Down Expand Up @@ -65,12 +70,13 @@ static void DataSourceThread() {
break;
}

if (info->maxQueueSize == 0) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
auto start = std::chrono::high_resolution_clock::now();
constexpr auto MS_200 = std::chrono::milliseconds(200);
for (; std::chrono::high_resolution_clock::now() - start < MS_200;);
if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) {
// Let's make this thread really busy to give the main thread a chance to
// abort / close.
std::unique_lock<std::mutex> lk(info->protect);
while (!info->closeCalledFromJs) {
info->signal.wait(lk);
}
}

switch (status) {
Expand Down Expand Up @@ -112,6 +118,11 @@ static Value StopThread(const CallbackInfo& info) {
} else {
tsfn.Release();
}
{
std::lock_guard<std::mutex> _(tsfnInfo.protect);
tsfnInfo.closeCalledFromJs = true;
tsfnInfo.signal.notify_one();
}
return Value();
}

Expand All @@ -134,6 +145,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
tsfnInfo.abort = info[1].As<Boolean>();
tsfnInfo.startSecondary = info[2].As<Boolean>();
tsfnInfo.maxQueueSize = info[3].As<Number>().Uint32Value();
tsfnInfo.closeCalledFromJs = false;

tsfn = ThreadSafeFunction::New(info.Env(), info[0].As<Function>(),
"Test", tsfnInfo.maxQueueSize, 2, &tsfnInfo, JoinTheThreads, threads);
Expand Down
42 changes: 28 additions & 14 deletions test/typed_threadsafe_function/typed_threadsafe_function.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include "napi.h"

Expand All @@ -17,6 +19,9 @@ static struct ThreadSafeFunctionInfo {
bool startSecondary;
FunctionReference jsFinalizeCallback;
uint32_t maxQueueSize;
bool closeCalledFromJs;
std::mutex protect;
std::condition_variable signal;
} tsfnInfo;

static void TSFNCallJS(Env env,
Expand All @@ -42,7 +47,7 @@ static int ints[ARRAY_LENGTH];

static void SecondaryThread() {
if (tsfn.Release() != napi_ok) {
Error::Fatal("SecondaryThread", "ThreadSafeFunction.Release() failed");
Error::Fatal("TypedSecondaryThread", "ThreadSafeFunction.Release() failed");
}
}

Expand All @@ -52,7 +57,8 @@ static void DataSourceThread() {

if (info->startSecondary) {
if (tsfn.Acquire() != napi_ok) {
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed");
Error::Fatal("TypedDataSourceThread",
"ThreadSafeFunction.Acquire() failed");
}

threads[1] = std::thread(SecondaryThread);
Expand All @@ -75,13 +81,13 @@ static void DataSourceThread() {
break;
}

if (info->maxQueueSize == 0) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
auto start = std::chrono::high_resolution_clock::now();
constexpr auto MS_200 = std::chrono::milliseconds(200);
for (; std::chrono::high_resolution_clock::now() - start < MS_200;)
;
if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) {
// Let's make this thread really busy to give the main thread a chance to
// abort / close.
std::unique_lock<std::mutex> lk(info->protect);
while (!info->closeCalledFromJs) {
info->signal.wait(lk);
}
}

switch (status) {
Expand All @@ -98,20 +104,22 @@ static void DataSourceThread() {
break;

default:
Error::Fatal("DataSourceThread", "ThreadSafeFunction.*Call() failed");
Error::Fatal("TypedDataSourceThread",
"ThreadSafeFunction.*Call() failed");
}
}

if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) {
Error::Fatal("DataSourceThread", "Queue was never full");
Error::Fatal("TypedDataSourceThread", "Queue was never full");
}

if (info->abort && !queueWasClosing) {
Error::Fatal("DataSourceThread", "Queue was never closing");
Error::Fatal("TypedDataSourceThread", "Queue was never closing");
}

if (!queueWasClosing && tsfn.Release() != napi_ok) {
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Release() failed");
Error::Fatal("TypedDataSourceThread",
"ThreadSafeFunction.Release() failed");
}
}

Expand All @@ -123,6 +131,11 @@ static Value StopThread(const CallbackInfo& info) {
} else {
tsfn.Release();
}
{
std::lock_guard<std::mutex> _(tsfnInfo.protect);
tsfnInfo.closeCalledFromJs = true;
tsfnInfo.signal.notify_one();
}
return Value();
}

Expand All @@ -145,6 +158,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
tsfnInfo.abort = info[1].As<Boolean>();
tsfnInfo.startSecondary = info[2].As<Boolean>();
tsfnInfo.maxQueueSize = info[3].As<Number>().Uint32Value();
tsfnInfo.closeCalledFromJs = false;

tsfn = TSFN::New(info.Env(),
info[0].As<Function>(),
Expand All @@ -163,7 +177,7 @@ static Value StartThreadInternal(const CallbackInfo& info,

static Value Release(const CallbackInfo& /* info */) {
if (tsfn.Release() != napi_ok) {
Error::Fatal("Release", "ThreadSafeFunction.Release() failed");
Error::Fatal("Release", "TypedThreadSafeFunction.Release() failed");
}
return Value();
}
Expand Down