Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async #729

Closed
wants to merge 12 commits into from
100 changes: 90 additions & 10 deletions src/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
#include "private.h"

typedef struct {
luv_thread_arg_t targ;
uv_mutex_t mutex; // protect thread arg modifications
lua_State* L; // sender Lua state or NULL when nothing to send
} luv_async_arg_t;

static uv_async_t* luv_check_async(lua_State* L, int index) {
uv_async_t* handle = (uv_async_t*)luv_checkudata(L, index, "uv_async");
luaL_argcheck(L, handle->type == UV_ASYNC && handle->data, index, "Expected uv_async_t");
Expand All @@ -25,39 +31,113 @@ static uv_async_t* luv_check_async(lua_State* L, int index) {
static void luv_async_cb(uv_async_t* handle) {
luv_handle_t* data = (luv_handle_t*)handle->data;
lua_State* L = data->ctx->L;
int n = luv_thread_arg_push(L, (luv_thread_arg_t*)data->extra, LUVF_THREAD_SIDE_MAIN);
luv_call_callback(L, data, LUV_ASYNC, n);
luv_thread_arg_clear(L, (luv_thread_arg_t*)data->extra, LUVF_THREAD_SIDE_MAIN);
luv_async_arg_t* asarg = (luv_async_arg_t *)((luv_handle_t*) handle->data)->extra;
uv_mutex_t *argmutex = &asarg->mutex;
int n = -1;
uv_mutex_lock(argmutex);
if (asarg->L) {
n = luv_thread_arg_push(L, &asarg->targ, LUVF_THREAD_SIDE_MAIN);
luv_thread_arg_clear(L, &asarg->targ, LUVF_THREAD_SIDE_MAIN);
asarg->L = NULL;
}
uv_mutex_unlock(argmutex);
if (n >= 0) {
luv_call_callback(L, data, LUV_ASYNC, n);
}
}

static int luv_new_async(lua_State* L) {
uv_async_t* handle;
luv_handle_t* data;
luv_ref_t *ref;
int ret;
luv_ctx_t* ctx = luv_context(L);
luaL_checktype(L, 1, LUA_TFUNCTION);
handle = (uv_async_t*)luv_newuserdata(L, uv_handle_size(UV_ASYNC));
ref = (luv_ref_t*)luv_newuserdata(L, sizeof(luv_ref_t));
handle = &ref->handle.async;
ret = uv_mutex_init(&ref->mutex);
if (ret < 0) {
lua_pop(L, 1);
return luv_error(L, ret);
}
ret = uv_async_init(ctx->loop, handle, luv_async_cb);
if (ret < 0) {
uv_mutex_destroy(&ref->mutex);
lua_pop(L, 1);
return luv_error(L, ret);
}
ref->count = 1;
data = luv_setup_handle(L, ctx);
data->extra = (luv_thread_arg_t*)malloc(sizeof(luv_thread_arg_t));
luv_async_arg_t* asarg = (luv_async_arg_t*)malloc(sizeof(luv_async_arg_t));
data->extra = asarg;
data->extra_gc = free;
memset(data->extra, 0, sizeof(luv_thread_arg_t));
memset(asarg, 0, sizeof(luv_async_arg_t));
ret = uv_mutex_init(&asarg->mutex);
if (ret < 0) { // unlikely
abort();
}
handle->data = data;
luv_check_callback(L, (luv_handle_t*)handle->data, LUV_ASYNC, 1);
return 1;
}

static void luv_async_arg_clear_child(lua_State* L, luv_async_arg_t* asarg) {
if (asarg->L) {
luv_thread_arg_clear(L == asarg->L ? L : NULL, &asarg->targ, LUVF_THREAD_SIDE_CHILD);
asarg->L = NULL;
}
}

static int luv_handle_gc(lua_State* L);

static int luv_async_gc(lua_State* L) {
luv_ref_t* ref = *(luv_ref_t**)lua_touserdata(L, 1);
uv_mutex_t *refmutex = &ref->mutex;
luv_handle_t* data = (luv_handle_t*)ref->handle.async.data;
luv_async_arg_t* asarg = (luv_async_arg_t *)data->extra;
uv_mutex_t *argmutex = &asarg->mutex;
int count;
// decrease reference count
uv_mutex_lock(refmutex);
ref->count--;
count = ref->count;
uv_mutex_unlock(refmutex);
if (count > 0) {
return 0;
}
uv_mutex_lock(argmutex);
luv_async_arg_clear_child(L, asarg);
uv_mutex_unlock(argmutex);
// destroy
uv_mutex_destroy(argmutex);
uv_mutex_destroy(refmutex);
return luv_handle_gc(L);
}

static int luv_async_send(lua_State* L) {
int ret;
uv_async_t* handle = luv_check_async(L, 1);
luv_thread_arg_t* arg = (luv_thread_arg_t *)((luv_handle_t*) handle->data)->extra;

luv_thread_arg_set(L, arg, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
luv_async_arg_t* asarg = (luv_async_arg_t *)((luv_handle_t*) handle->data)->extra;
uv_mutex_t *argmutex = &asarg->mutex;
int n;
uv_mutex_lock(argmutex);
luv_async_arg_clear_child(L, asarg);
n = luv_thread_arg_set(L, &asarg->targ, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
if (n >= 0) {
asarg->L = L;
}
uv_mutex_unlock(argmutex);
if (n < 0) {
return luv_thread_arg_error(L);
}
ret = uv_async_send(handle);
luv_thread_arg_clear(L, arg, LUVF_THREAD_SIDE_CHILD);
// clear on gc
return luv_result(L, ret);
}

static void luv_async_init(lua_State* L) {
luaL_getmetatable(L, "uv_async");
lua_pushcfunction(L, luv_async_gc);
lua_setfield(L, -2, "__gc");
lua_pop(L, 1);
}
8 changes: 8 additions & 0 deletions src/lthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ typedef struct {
luv_val_t argv[LUV_THREAD_MAXNUM_ARG];
} luv_thread_arg_t;

typedef struct {
union {
uv_async_t async;
} handle;
uv_mutex_t mutex;
int count;
} luv_ref_t;

//luajit miss LUA_OK
#ifndef LUA_OK
#define LUA_OK 0
Expand Down
1 change: 1 addition & 0 deletions src/luv.c
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ LUALIB_API int luaopen_luv (lua_State* L) {

luv_req_init(L);
luv_handle_init(L);
luv_async_init(L);
#if LUV_UV_VERSION_GEQ(1, 28, 0)
luv_dir_init(L);
#endif
Expand Down
114 changes: 75 additions & 39 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int
}
break;
case LUA_TSTRING:
if (async)
{
if (async) {
const char* p = lua_tolstring(L, i, &arg->val.str.len);
arg->val.str.base = malloc(arg->val.str.len);
memcpy((void*)arg->val.str.base, p, arg->val.str.len);
Expand All @@ -103,13 +102,28 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int
}
break;
case LUA_TUSERDATA:
arg->val.udata.data = lua_topointer(L, i);
arg->val.udata.size = lua_rawlen(L, i);
arg->val.udata.metaname = luv_getmtname(L, i);

if (arg->val.udata.size) {
lua_pushvalue(L, i);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
{
const void* p = lua_topointer(L, i);
size_t l = lua_rawlen(L, i);
const char* mtname = luv_getmtname(L, i);
if (async) {
if (p && l) {
void* b = malloc(l);
memcpy(b, p, l);
p = (const void*)b;
}
if (mtname) {
char* b = malloc(strlen(mtname) + 1);
strcpy(b, mtname);
mtname = (const void*)b;
}
} else {
lua_pushvalue(L, i);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
}
arg->val.udata.data = p;
arg->val.udata.size = l;
arg->val.udata.metaname = mtname;
}
break;
default:
Expand All @@ -127,7 +141,7 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int
static void luv_thread_arg_clear(lua_State* L, luv_thread_arg_t* args, int flags) {
int i;
int side = LUVF_THREAD_SIDE(flags);
int set = LUVF_THREAD_SIDE(args->flags);
int pushed = LUVF_THREAD_SIDE(args->flags) != side; // clear is called on a side different from the set one
int async = LUVF_THREAD_ASYNC(args->flags);

if (args->argc == 0)
Expand All @@ -137,44 +151,49 @@ static void luv_thread_arg_clear(lua_State* L, luv_thread_arg_t* args, int flags
luv_val_t* arg = args->argv + i;
switch (arg->type) {
case LUA_TSTRING:
if (arg->ref[side] != LUA_NOREF)
{
if (arg->ref[side] != LUA_NOREF) {
luaL_unref(L, LUA_REGISTRYINDEX, arg->ref[side]);
arg->ref[side] = LUA_NOREF;
} else {
if(async && set!=side)
{
free((void*)arg->val.str.base);
arg->val.str.base = NULL;
arg->val.str.len = 0;
}
}
if (async && pushed) {
free((void*)arg->val.str.base);
arg->val.str.base = NULL;
arg->val.str.len = 0;
}
break;
case LUA_TUSERDATA:
if (arg->ref[side]!=LUA_NOREF)
{
if (side != set)
{
if (arg->ref[side] != LUA_NOREF) {
if (pushed) {
// avoid custom gc
lua_rawgeti(L, LUA_REGISTRYINDEX, arg->ref[side]);
lua_pushnil(L);
lua_setmetatable(L, -2);
lua_pop(L, -1);
lua_pop(L, 1);
}
luaL_unref(L, LUA_REGISTRYINDEX, arg->ref[side]);
arg->ref[side] = LUA_NOREF;
}
if (async && pushed) {
free((void*)arg->val.udata.data);
free((void*)arg->val.udata.metaname);
arg->val.udata.data = NULL;
arg->val.udata.size = 0;
arg->val.udata.metaname = NULL;
}
break;
default:
break;
}
}
}

#define luv_thread_can_reference(_mtname) ((_mtname) && (strcmp((_mtname), "uv_async") == 0))

// called only in thread
static int luv_thread_arg_push(lua_State* L, luv_thread_arg_t* args, int flags) {
int i = 0;
int side = LUVF_THREAD_SIDE(flags);
int async = LUVF_THREAD_ASYNC(args->flags);

while (i < args->argc) {
luv_val_t* arg = args->argv + i;
Expand All @@ -196,19 +215,31 @@ static int luv_thread_arg_push(lua_State* L, luv_thread_arg_t* args, int flags)
lua_pushlstring(L, arg->val.str.base, arg->val.str.len);
break;
case LUA_TUSERDATA:
if (arg->val.udata.size)
{
char *p = lua_newuserdata(L, arg->val.udata.size);
memcpy(p, arg->val.udata.data, arg->val.udata.size);
if (arg->val.udata.metaname)
{
luaL_getmetatable(L, arg->val.udata.metaname);
lua_setmetatable(L, -2);
if (arg->val.udata.metaname) {
int setmt = !async;
int needref = setmt;
if (luv_thread_can_reference(arg->val.udata.metaname)) {
luv_ref_t* ref = *(luv_ref_t**)p;
uv_mutex_lock(&ref->mutex);
if (ref->count > 0) {
ref->count++;
setmt = 1;
needref = 0;
}
uv_mutex_unlock(&ref->mutex);
}
if (setmt) {
luaL_getmetatable(L, arg->val.udata.metaname);
lua_setmetatable(L, -2);
if (needref) {
lua_pushvalue(L, -1);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
}
}
}
lua_pushvalue(L, -1);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
}else{
lua_pushlightuserdata(L, (void*)arg->val.udata.data);
}
break;
default:
Expand Down Expand Up @@ -306,9 +337,11 @@ static void luv_thread_cb(void* varg) {

static void luv_thread_notify_close_cb(uv_handle_t *handle) {
luv_thread_t *thread = handle->data;
if (thread->handle != 0)
uv_thread_join(&thread->handle);

uv_thread_t uvt = thread->handle;
if (uvt != 0) {
thread->handle = 0;
uv_thread_join(&uvt);
}
luaL_unref(thread->L, LUA_REGISTRYINDEX, thread->ref);
thread->ref = LUA_NOREF;
thread->L = NULL;
Expand Down Expand Up @@ -494,9 +527,12 @@ static int luv_thread_setpriority(lua_State* L) {

static int luv_thread_join(lua_State* L) {
luv_thread_t* tid = luv_check_thread(L, 1);
int ret = uv_thread_join(&tid->handle);
if (ret < 0) return luv_error(L, ret);
tid->handle = 0;
uv_thread_t uvt = tid->handle;
if (uvt != 0) {
tid->handle = 0;
int ret = uv_thread_join(&uvt);
if (ret < 0) return luv_error(L, ret);
}
lua_pushboolean(L, 1);
return 1;
}
Expand Down
Loading
Loading