Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
toyobayashi committed Aug 8, 2024
1 parent a0d2888 commit a31b92f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 38 deletions.
2 changes: 1 addition & 1 deletion packages/emnapi/src/uv/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* IN THE SOFTWARE.
*/

// from libuv 1.43.0
// from libuv 1.48.0

#if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT)

Expand Down
86 changes: 49 additions & 37 deletions packages/emnapi/src/uv/unix/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,11 @@
#include "uv.h"
#include "internal.h"

#include <stdatomic.h>
#include <stdlib.h>
#include <sched.h>
#include "emnapi_common.h"

#if defined(__clang__) || \
defined(__GNUC__) || \
defined(__INTEL_COMPILER)
# define UV_UNUSED(declaration) __attribute__((unused)) declaration
#else
# define UV_UNUSED(declaration) declaration
#endif

UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval));

UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval)) {
return __sync_val_compare_and_swap(ptr, oldval, newval);
}

#if EMNAPI_USE_PROXYING
#include <emscripten/threading.h>
#include <emscripten/proxying.h>
Expand Down Expand Up @@ -77,6 +64,9 @@ void _emnapi_destroy_proxying_queue(uv_loop_t* loop) {}

#endif

static void uv__async_send(uv_loop_t* loop);
static void uv__cpu_relax(void);

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
Expand All @@ -89,26 +79,28 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
}

/* Only call this from the event loop thread. */
static int uv__async_spin(uv_async_t* handle) {
static void uv__async_spin(uv_async_t* handle) {
_Atomic int* pending;
_Atomic int* busy;
int i;
int rc;

pending = (_Atomic int*) &handle->pending;
busy = (_Atomic int*) &handle->u.fd;

/* Set the pending flag first, so no new events will be added by other
* threads after this function returns. */
atomic_store(pending, 1);

for (;;) {
/* 997 is not completely chosen at random. It's a prime number, acyclical
* by nature, and should therefore hopefully dampen sympathetic resonance.
/* 997 is not completely chosen at random. It's a prime number, acyclic by
* nature, and should therefore hopefully dampen sympathetic resonance.
*/
for (i = 0; i < 997; i++) {
/* rc=0 -- handle is not pending.
* rc=1 -- handle is pending, other thread is still working with it.
* rc=2 -- handle is pending, other thread is done.
*/
rc = cmpxchgi(&handle->pending, 2, 0);

if (rc != 1)
return rc;
if (atomic_load(busy) == 0)
return;

/* Other thread is busy with this handle, spin until it's done. */
// cpu_relax();
uv__cpu_relax();
}

/* Yield the CPU. We may have preempted the other thread while it's
Expand All @@ -123,6 +115,7 @@ static void uv__async_io(uv_loop_t* loop) {
struct uv__queue queue;
struct uv__queue* q;
uv_async_t* h;
_Atomic int *pending;

uv__queue_move(&loop->async_handles, &queue);
while (!uv__queue_empty(&queue)) {
Expand All @@ -132,8 +125,10 @@ static void uv__async_io(uv_loop_t* loop) {
uv__queue_remove(q);
uv__queue_insert_tail(&loop->async_handles, q);

if (0 == uv__async_spin(h))
continue; /* Not pending. */
/* Atomically fetch and clear pending flag */
pending = (_Atomic int*) &h->pending;
if (atomic_exchange(pending, 0) == 0)
continue;

if (h->async_cb == NULL)
continue;
Expand Down Expand Up @@ -186,20 +181,25 @@ static void uv__async_send(uv_loop_t* loop) {
#define ACCESS_ONCE(type, var) (*(volatile type*) &(var))

int uv_async_send(uv_async_t* handle) {
_Atomic int* pending;
_Atomic int* busy;

pending = (_Atomic int*) &handle->pending;
busy = (_Atomic int*) &handle->u.fd;

/* Do a cheap read first. */
if (ACCESS_ONCE(int, handle->pending) != 0)
if (atomic_load_explicit(pending, memory_order_relaxed) != 0)
return 0;

/* Tell the other thread we're busy with the handle. */
if (cmpxchgi(&handle->pending, 0, 1) != 0)
return 0;
/* Set the loop to busy. */
atomic_fetch_add(busy, 1);

/* Wake up the other thread's event loop. */
uv__async_send(handle->loop);
if (atomic_exchange(pending, 1) == 0)
uv__async_send(handle->loop);

/* Tell the other thread we're done. */
if (cmpxchgi(&handle->pending, 1, 2) != 1)
abort();
/* Set the loop to not-busy. */
atomic_fetch_add(busy, -1);

return 0;
}
Expand All @@ -210,4 +210,16 @@ void uv__async_close(uv_async_t* handle) {
uv__handle_stop(handle);
}

static void uv__cpu_relax(void) {
#if defined(__i386__) || defined(__x86_64__)
__asm__ __volatile__ ("rep; nop" ::: "memory"); /* a.k.a. PAUSE */
#elif (defined(__arm__) && __ARM_ARCH >= 7) || defined(__aarch64__)
__asm__ __volatile__ ("yield" ::: "memory");
#elif (defined(__ppc__) || defined(__ppc64__)) && defined(__APPLE__)
__asm volatile ("" : : : "memory");
#elif !defined(__APPLE__) && (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__))
__asm__ __volatile__ ("or 1,1,1; or 2,2,2" ::: "memory");
#endif
}

#endif

0 comments on commit a31b92f

Please sign in to comment.