Skip to content

Commit

Permalink
uv__queue
Browse files Browse the repository at this point in the history
  • Loading branch information
toyobayashi committed Jun 21, 2024
1 parent 5f4b9f5 commit 607c14d
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 169 deletions.
16 changes: 11 additions & 5 deletions packages/emnapi/include/node/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ struct uv__queue {
extern "C" {
#endif

/* Internal type, do not use. */
struct uv__queue {
struct uv__queue* next;
struct uv__queue* prev;
};

#define UV_EXTERN /* nothing */

typedef enum {
Expand Down Expand Up @@ -131,7 +137,7 @@ typedef void (*uv_async_cb)(uv_async_t* handle);
uv_loop_t* loop; \
uv_handle_type type; \
uv_close_cb close_cb; \
void* handle_queue[2]; \
struct uv__queue handle_queue; \
union { \
int fd; \
void* reserved[4]; \
Expand All @@ -145,7 +151,7 @@ struct uv_handle_s {
struct uv_async_s {
UV_HANDLE_FIELDS
uv_async_cb async_cb;
void* queue[2];
struct uv__queue queue;
int pending;
};

Expand All @@ -171,17 +177,17 @@ UV_EXTERN uint64_t uv_metrics_idle_time(uv_loop_t* loop);
struct uv_loop_s {
void* data;
unsigned int active_handles;
void* handle_queue[2];
struct uv__queue handle_queue;
union {
void* unused;
unsigned int count;
} active_reqs;
void* internal_fields;

void* wq[2];
struct uv__queue wq;
uv_mutex_t wq_mutex;
uv_async_t wq_async;
void* async_handles[2];
struct uv__queue async_handles;
void* em_queue;
};

Expand Down
2 changes: 1 addition & 1 deletion packages/emnapi/include/node/uv/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
void* wq[2];
struct uv__queue wq;
};

#endif
Expand Down
32 changes: 16 additions & 16 deletions packages/emnapi/src/threadsafe_function.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct napi_threadsafe_function__ {
pthread_mutex_t mutex;
pthread_cond_t* cond;
size_t queue_size;
void* queue[2];
struct uv__queue queue;
uv_async_t async;
size_t thread_count;
bool is_closing;
Expand Down Expand Up @@ -92,7 +92,7 @@ _emnapi_tsfn_create(napi_env env,
pthread_mutex_init(&ts_fn->mutex, NULL);
ts_fn->cond = NULL;
ts_fn->queue_size = 0;
QUEUE_INIT(&ts_fn->queue);
uv__queue_init(&ts_fn->queue);
ts_fn->thread_count = initial_thread_count;
ts_fn->is_closing = false;
ts_fn->dispatch_state = kDispatchIdle;
Expand Down Expand Up @@ -125,13 +125,13 @@ static void _emnapi_tsfn_destroy(napi_threadsafe_function func) {
func->cond = NULL;
}

QUEUE* tmp;
struct uv__queue* tmp;
struct data_queue_node* node;
QUEUE_FOREACH(tmp, &func->queue) {
node = QUEUE_DATA(tmp, struct data_queue_node, q);
uv__queue_foreach(tmp, &func->queue) {
node = uv__queue_data(tmp, struct data_queue_node, q);
free(node);
}
QUEUE_INIT(&func->queue);
uv__queue_init(&func->queue);

if (func->ref != NULL) {
EMNAPI_ASSERT_CALL(napi_delete_reference(func->env, func->ref));
Expand Down Expand Up @@ -187,14 +187,14 @@ static napi_status _emnapi_tsfn_init(napi_threadsafe_function func) {
}

static void _emnapi_tsfn_empty_queue_and_delete(napi_threadsafe_function func) {
while (!QUEUE_EMPTY(&func->queue)) {
QUEUE* q = QUEUE_HEAD(&func->queue);
struct data_queue_node* node = QUEUE_DATA(q, struct data_queue_node, q);
while (!uv__queue_empty(&func->queue)) {
struct uv__queue* q = uv__queue_head(&func->queue);
struct data_queue_node* node = uv__queue_data(q, struct data_queue_node, q);

func->call_js_cb(NULL, NULL, func->context, node->data);

QUEUE_REMOVE(q);
QUEUE_INIT(q);
uv__queue_remove(q);
uv__queue_init(q);
func->queue_size--;
free(node);
}
Expand Down Expand Up @@ -295,10 +295,10 @@ static bool _emnapi_tsfn_dispatch_one(napi_threadsafe_function func) {
} else {
size_t size = func->queue_size;
if (size > 0) {
QUEUE* q = QUEUE_HEAD(&func->queue);
struct data_queue_node* node = QUEUE_DATA(q, struct data_queue_node, q);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
struct uv__queue* q = uv__queue_head(&func->queue);
struct data_queue_node* node = uv__queue_data(q, struct data_queue_node, q);
uv__queue_remove(q);
uv__queue_init(q);
func->queue_size--;
data = node->data;
free(node);
Expand Down Expand Up @@ -519,7 +519,7 @@ napi_call_threadsafe_function(napi_threadsafe_function func,
return napi_generic_failure;
}
queue_node->data = data;
QUEUE_INSERT_TAIL(&func->queue, &queue_node->q);
uv__queue_insert_tail(&func->queue, &queue_node->q);
func->queue_size++;
_emnapi_tsfn_send(func);
pthread_mutex_unlock(&func->mutex);
Expand Down
154 changes: 68 additions & 86 deletions packages/emnapi/src/uv/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,91 +18,73 @@

#include <stddef.h>

typedef void *QUEUE[2];

/* Private macros. */
#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field) \
((type *) ((char *) (ptr) - offsetof(type, field)))

/* Important note: mutating the list while QUEUE_FOREACH is
* iterating over its elements results in undefined behavior.
*/
#define QUEUE_FOREACH(q, h) \
for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))

#define QUEUE_EMPTY(q) \
((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))

#define QUEUE_HEAD(q) \
(QUEUE_NEXT(q))

#define QUEUE_INIT(q) \
do { \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
} \
while (0)

#define QUEUE_ADD(h, n) \
do { \
QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV(h) = QUEUE_PREV(n); \
QUEUE_PREV_NEXT(h) = (h); \
} \
while (0)

#define QUEUE_SPLIT(h, q, n) \
do { \
QUEUE_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(n) = (n); \
QUEUE_NEXT(n) = (q); \
QUEUE_PREV(h) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(h) = (h); \
QUEUE_PREV(q) = (n); \
} \
while (0)

#define QUEUE_MOVE(h, n) \
do { \
if (QUEUE_EMPTY(h)) \
QUEUE_INIT(n); \
else { \
QUEUE* q = QUEUE_HEAD(h); \
QUEUE_SPLIT(h, q, n); \
} \
} \
while (0)

#define QUEUE_INSERT_HEAD(h, q) \
do { \
QUEUE_NEXT(q) = QUEUE_NEXT(h); \
QUEUE_PREV(q) = (h); \
QUEUE_NEXT_PREV(q) = (q); \
QUEUE_NEXT(h) = (q); \
} \
while (0)

#define QUEUE_INSERT_TAIL(h, q) \
do { \
QUEUE_NEXT(q) = (h); \
QUEUE_PREV(q) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(q) = (q); \
QUEUE_PREV(h) = (q); \
} \
while (0)

#define QUEUE_REMOVE(q) \
do { \
QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
} \
while (0)
#define uv__queue_data(pointer, type, field) \
((type*) ((char*) (pointer) - offsetof(type, field)))

#define uv__queue_foreach(q, h) \
for ((q) = (h)->next; (q) != (h); (q) = (q)->next)

static inline void uv__queue_init(struct uv__queue* q) {
q->next = q;
q->prev = q;
}

static inline int uv__queue_empty(const struct uv__queue* q) {
return q == q->next;
}

static inline struct uv__queue* uv__queue_head(const struct uv__queue* q) {
return q->next;
}

static inline struct uv__queue* uv__queue_next(const struct uv__queue* q) {
return q->next;
}

static inline void uv__queue_add(struct uv__queue* h, struct uv__queue* n) {
h->prev->next = n->next;
n->next->prev = h->prev;
h->prev = n->prev;
h->prev->next = h;
}

static inline void uv__queue_split(struct uv__queue* h,
struct uv__queue* q,
struct uv__queue* n) {
n->prev = h->prev;
n->prev->next = n;
n->next = q;
h->prev = q->prev;
h->prev->next = h;
q->prev = n;
}

static inline void uv__queue_move(struct uv__queue* h, struct uv__queue* n) {
if (uv__queue_empty(h))
uv__queue_init(n);
else
uv__queue_split(h, h->next, n);
}

static inline void uv__queue_insert_head(struct uv__queue* h,
struct uv__queue* q) {
q->next = h->next;
q->prev = h;
q->next->prev = q;
h->next = q;
}

static inline void uv__queue_insert_tail(struct uv__queue* h,
struct uv__queue* q) {
q->next = h;
q->prev = h->prev;
q->prev->next = q;
h->prev = q;
}

static inline void uv__queue_remove(struct uv__queue* q) {
q->prev->next = q->next;
q->next->prev = q->prev;
}

#endif /* QUEUE_H_ */
Loading

0 comments on commit 607c14d

Please sign in to comment.