Skip to content

Commit

Permalink
Merge pull request #135 from albertnetymk/await
Browse files Browse the repository at this point in the history
Add `await`. (GC disabled while processing await.)
  • Loading branch information
Kiko committed Apr 23, 2015
2 parents 99f976b + 7385d02 commit 7b21d5d
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 123 deletions.
172 changes: 76 additions & 96 deletions src/runtime/encore/encore.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#define _XOPEN_SOURCE 800
#include <pony/pony.h>
#include "encore.h"
#include "closure.h"
Expand All @@ -12,15 +13,29 @@
#include <stdio.h>
#include "task.h"

extern void public_run(pthread_mutex_t *lock);
#ifdef LAZY_IMPL
__attribute__ ((noreturn))
static void actor_resume(encore_actor_t *actor);
__attribute__ ((noreturn))
static void actor_suspend_resume(encore_actor_t *actor, ucontext_t *ctx);
__attribute__ ((noreturn))
static void actor_await_resume(encore_actor_t *actor, ucontext_t *ctx);
#else
static void actor_resume(encore_actor_t *actor);
static void actor_suspend_resume(encore_actor_t *actor, ucontext_t *ctx);
static void actor_await_resume(encore_actor_t *actor, ucontext_t *ctx);
#endif

enum
{
FLAG_BLOCKED = 1 << 0,
FLAG_SYSTEM = 1 << 1,
FLAG_UNSCHEDULED = 1 << 2,
FLAG_PENDINGDESTROY = 1 << 3,
};
#ifdef LAZY_IMPL
__attribute__ ((noreturn))
static void actor_resume_context(encore_actor_t *actor, ucontext_t *ctx);
#else
static void actor_resume_context(encore_actor_t *actor, ucontext_t *ctx);
#endif

extern void public_run(pony_actor_t *actor, bool reschedule);

extern bool pony_system_actor();

#define MAX_IN_POOL 4

Expand All @@ -37,6 +52,16 @@ __pony_thread_local context *this_context;
__pony_thread_local ucontext_t *origin;
__pony_thread_local ucontext_t *root;

void actor_unlock(encore_actor_t *actor)
{
if (!pony_system_actor()) {
if (actor->lock) {
pthread_mutex_unlock(actor->lock);
actor->lock = NULL;
}
}
}

#ifndef LAZY_IMPL

static __pony_thread_local stack_page *stack_pool = NULL;
Expand Down Expand Up @@ -90,14 +115,6 @@ bool actor_run_to_completion(encore_actor_t *actor)
return actor->run_to_completion;
}

void actor_unlock(encore_actor_t *actor)
{
if (actor->lock) {
pthread_mutex_unlock(actor->lock);
actor->lock = NULL;
}
}

#endif

#ifdef LAZY_IMPL
Expand All @@ -116,7 +133,7 @@ static context *pop_context(encore_actor_t *actor)
} else {
available_context--;
}
makecontext(&context_pool->ctx, (void(*)(void))public_run, 1, actor->lock);
makecontext(&context_pool->ctx, (void(*)(void))public_run, 1, actor);
c = context_pool;
context_pool = c->next;
assert(c->ctx.uc_stack.ss_sp);
Expand Down Expand Up @@ -161,7 +178,7 @@ static void clean_pool()
#endif
}

void actor_block(encore_actor_t *actor)
void actor_save_context(encore_actor_t *actor, ucontext_t *ctx)
{
#ifndef LAZY_IMPL

Expand All @@ -173,49 +190,28 @@ void actor_block(encore_actor_t *actor)
assert(actor->page);
assert(actor->page->stack);
actor->run_to_completion = false;
assert_swap(&actor->ctx, actor->ctx.uc_link);
assert_swap(ctx, &actor->home_ctx);

#else

actor->saved = &this_context->ctx;

context *old = this_context;

this_context = pop_context(actor);
assert_swap(actor->saved, &this_context->ctx);

assert_swap(ctx, &this_context->ctx);
this_context = old;

#endif
}

void actor_set_resume(encore_actor_t *actor)
void actor_block(encore_actor_t *actor)
{
actor->resume = true;
}

void actor_resume(encore_actor_t *actor)
{
#ifndef LAZY_IMPL

actor->resume = false;
actor->run_to_completion = true;
assert_swap(actor->ctx.uc_link, &actor->ctx);

if (actor->run_to_completion) {
reclaim_page(actor);
}

actor_save_context(actor, &actor->ctx);
#else

actor->resume = false;
if (&this_context->ctx != root) {
push_context(this_context);
}
setcontext(actor->saved);
assert(0);

actor->saved = &this_context->ctx;
actor_save_context(actor, actor->saved);
#endif

}

void actor_suspend()
Expand All @@ -224,49 +220,36 @@ void actor_suspend()
actor->suspend_counter++;

ucontext_t ctx;

#ifndef LAZY_IMPL

if (!actor->page) {
assert(local_page->stack);
actor->page = local_page;
local_page = NULL;
}

assert(actor->page);
assert(actor->page->stack);

actor->run_to_completion = false;

pony_sendp((pony_actor_t*) actor, _ENC__MSG_RESUME_SUSPEND, &ctx);

assert_swap(&ctx, &actor->home_ctx);
actor_save_context(actor, &ctx);

actor->suspend_counter--;
assert(actor->suspend_counter >= 0);
}

#else
context *old = this_context;

pony_sendp((pony_actor_t*) actor, _ENC__MSG_RESUME_SUSPEND, &ctx);
void actor_await(ucontext_t *ctx)
{
encore_actor_t *actor = (encore_actor_t *) actor_current();
actor->await_counter++;

this_context = pop_context(actor);
scheduler_add((pony_actor_t*) actor);
assert_swap(&ctx, &this_context->ctx);
actor_save_context(actor, ctx);

this_context = old;
actor->await_counter--;

#endif

actor->suspend_counter--;
assert(actor->await_counter >= 0);
}

void actor_suspend_resume(ucontext_t *ctx)
void actor_set_resume(encore_actor_t *actor)
{
encore_actor_t *actor = (encore_actor_t *) actor_current();
actor->resume = true;
}

static void actor_resume_context(encore_actor_t* actor, ucontext_t *ctx)
{
#ifndef LAZY_IMPL

actor_set_run_to_completion(actor);
actor->run_to_completion = true;

assert_swap(&actor->home_ctx, ctx);

Expand All @@ -283,35 +266,33 @@ void actor_suspend_resume(ucontext_t *ctx)
assert(0);

#endif

}

// TODO: suspend and await overlaps heavily
void actor_await(encore_actor_t *actor, void *future)
static void actor_resume(encore_actor_t *actor)
{
actor->resume = false;
#ifndef LAZY_IMPL
// Make a copy of the current context and replace it
ucontext_t ctxp = actor->ctx;
ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link };
encore_arg_t argv[2] = { { .p = &d }, { .p = future } };

actor->page = local_page;
local_page = NULL;
assert(actor->ctx.uc_link == &actor->home_ctx);
actor_resume_context(actor, &actor->ctx);
#else
actor_resume_context(actor, actor->saved);
#endif
}

// TODO find out the right way of calling pond_sendv
// pony_sendv(actor, FUT_MSG_AWAIT, 2, argv);
static void actor_suspend_resume(encore_actor_t *actor, ucontext_t *ctx)
{
actor_resume_context(actor, ctx);
}

actor->run_to_completion = false;
int ret = swapcontext(&ctxp, ctxp.uc_link);
assert(ret == 0);
assert(ctxp.uc_link == &actor->home_ctx);
#endif
static void actor_await_resume(encore_actor_t *actor, ucontext_t *ctx)
{
actor_resume_context(actor, ctx);
}

bool gc_disabled()
{
encore_actor_t *actor = (encore_actor_t*) actor_current();
return actor->suspend_counter > 0;
return actor->suspend_counter > 0 || actor->await_counter > 0;
}

encore_actor_t *encore_create(pony_type_t *type)
Expand Down Expand Up @@ -365,12 +346,11 @@ bool encore_actor_handle_message_hook(encore_actor_t *actor, pony_msg_t* msg)
switch(msg->id)
{
case _ENC__MSG_RESUME_SUSPEND:
actor_suspend_resume(((pony_msgp_t*)msg)->p);
actor_suspend_resume(actor, ((pony_msgp_t*)msg)->p);
return true;

case _ENC__MSG_RESUME_AWAIT:
assert(-1);
// future_await_resume(msg->argv);
actor_await_resume(actor, ((pony_msgp_t*)msg)->p);
return true;

case _ENC__MSG_RUN_CLOSURE:
Expand Down
13 changes: 2 additions & 11 deletions src/runtime/encore/encore.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ struct encore_actor
pony_actor_pad_t;
// Everything else that goes into an encore_actor that's not part of PonyRT
bool resume;
int await_counter;
int suspend_counter;
pthread_mutex_t *lock;
#ifndef LAZY_IMPL
Expand Down Expand Up @@ -118,23 +119,13 @@ bool encore_actor_run_hook(encore_actor_t *actor);
bool encore_actor_handle_message_hook(encore_actor_t *actor, pony_msg_t* msg);
void actor_block(encore_actor_t *actor);
void actor_set_resume(encore_actor_t *actor);
#ifdef LAZY_IMPL
void actor_resume(encore_actor_t *actor) __attribute__ ((noreturn));
#else
void actor_resume(encore_actor_t *actor);
#endif

#ifndef LAZY_IMPL
void actor_set_run_to_completion(encore_actor_t *actor);
bool actor_run_to_completion(encore_actor_t *actor);
#endif
void actor_suspend();

#ifdef LAZY_IMPL
void actor_suspend_resume(ucontext_t *ctx) __attribute__ ((noreturn));
#else
void actor_suspend_resume(ucontext_t *ctx);
#endif
void actor_await(ucontext_t *ctx);

/// calls the pony's respond with the current object's scheduler
void call_respond_with_current_scheduler();
Expand Down
Loading

0 comments on commit 7b21d5d

Please sign in to comment.