Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oncomplete #60

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion source/concurrency/asyncscope.d
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ auto asyncScope() @safe {
return as;
}

auto asyncScope(shared StopSource source) @safe {
// ensure NRVO
auto as = shared AsyncScope(source);
return as;
}

struct AsyncScope {
private:
import concurrency.bitfield : SharedBitField;
Expand All @@ -23,6 +29,7 @@ private:
shared Promise!void completion;
shared StopSource stopSource;
Throwable throwable;
shared StopCallback cb;

void forward() @trusted nothrow shared {
import core.atomic : atomicLoad;
Expand Down Expand Up @@ -61,13 +68,18 @@ public:
cleanup.syncWait();
}

this(shared StopSource stopSource) @safe shared {
this(shared StopSource stopSource) @trusted shared {
completion = new shared Promise!void;
this.stopSource = stopSource;
cb = cast(shared)this.stopSource.onStop(() @safe shared nothrow => cast(void)this.stop());
}

auto cleanup() @safe shared {
stop();
return onComplete();
}

auto onComplete() @safe shared {
return completion.sender();
}

Expand All @@ -76,6 +88,7 @@ public:
}

bool stop() nothrow @trusted shared {
cb.dispose();
import core.atomic : MemoryOrder;
if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0)
return false;
Expand Down
85 changes: 44 additions & 41 deletions source/concurrency/stoptoken.d
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module concurrency.stoptoken;
// it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0

class StopSource {
private stop_state state;
private shared stop_state state;
bool stop() nothrow @safe {
return state.request_stop();
}
Expand Down Expand Up @@ -51,28 +51,23 @@ struct NeverStopToken {
enum isStopPossible = false;
}

StopCallback onStop(StopSource stopSource, void delegate() nothrow @safe shared callback) nothrow @safe {
auto cb = new StopCallback(callback);
if (stopSource.state.try_add_callback(cb, true))
cb.source = stopSource;
return cb;
}

StopCallback onStop(StopSource stopSource, void function() nothrow @safe callback) nothrow @trusted {
import std.functional : toDelegate;
return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate);
}

StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe {
if (stopToken.isStopPossible) {
return stopToken.source.onStop(callback);
StopCallback onStop(Source)(Source stopSource, void delegate() nothrow @safe shared callback) nothrow @trusted {
static if (is(shared Source == shared StopSource)) {
auto cb = new StopCallback(callback);
if (stopSource.state.try_add_callback(cb, true))
cb.source = cast(shared)stopSource;
return cb;
} else {
if (stopSource.isStopPossible) {
return stopSource.source.onStop(callback);
}
return new StopCallback(callback);
}
return new StopCallback(callback);
}

StopCallback onStop(StopToken)(StopToken stopToken, void function() nothrow @safe callback) nothrow @trusted {
StopCallback onStop(Source)(Source stopSource, void function() nothrow @safe callback) nothrow @trusted {
import std.functional : toDelegate;
return stopToken.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate);
return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate);
}

class StopCallback {
Expand Down Expand Up @@ -105,16 +100,20 @@ private:
}

void delegate() nothrow shared @safe callback;
StopSource source;
shared StopSource source;

StopCallback next_ = null;
StopCallback* prev_ = null;
bool* isRemoved_ = null;
shared StopCallback next_ = null;
shared StopCallback* prev_ = null;
shared bool* isRemoved_ = null;
shared bool callbackFinishedExecuting = false;

void execute() nothrow @safe {
callback();
}

void execute() nothrow @safe shared {
callback();
}
}

deprecated("Use regular StopToken") alias StopTokenObject = StopToken;
Expand Down Expand Up @@ -155,31 +154,31 @@ private struct stop_state {
}

public:
void add_token_reference() nothrow @safe @nogc {
void add_token_reference() nothrow @safe @nogc shared {
// TODO: want to use atomicFetchAdd but (proper) support is only recent
// state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment);
state_.atomicOp!"+="(token_ref_increment);
}

void remove_token_reference() nothrow @safe @nogc {
void remove_token_reference() nothrow @safe @nogc shared {
// TODO: want to use atomicFetchSub but (proper) support is only recent
// state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment);
state_.atomicOp!"-="(token_ref_increment);
}

void add_source_reference() nothrow @safe @nogc {
void add_source_reference() nothrow @safe @nogc shared {
// TODO: want to use atomicFetchAdd but (proper) support is only recent
// state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment);
state_.atomicOp!"+="(source_ref_increment);
}

void remove_source_reference() nothrow @safe @nogc {
void remove_source_reference() nothrow @safe @nogc shared {
// TODO: want to use atomicFetchSub but (proper) support is only recent
// state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment);
state_.atomicOp!"-="(source_ref_increment);
}

bool request_stop() nothrow @safe {
bool request_stop() nothrow @safe shared {

if (!try_lock_and_signal_until_signalled()) {
// Stop has already been requested.
Expand All @@ -188,7 +187,7 @@ public:

// Set the 'stop_requested' signal and acquired the lock.

signallingThread_ = Thread.getThis();
signallingThread_ = getThis();

while (head_ !is null) {
// Dequeue the head of the queue
Expand All @@ -212,7 +211,7 @@ public:
// If the destructor runs on some other thread then the other
// thread will block waiting for this thread to signal that the
// callback has finished executing.
bool isRemoved = false;
shared bool isRemoved = false;
(() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down.

cb.execute();
Expand All @@ -238,15 +237,15 @@ public:
return true;
}

bool is_stop_requested() nothrow @safe @nogc {
bool is_stop_requested() nothrow @safe @nogc shared {
return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq));
}

bool is_stop_requestable() nothrow @safe @nogc {
bool is_stop_requestable() nothrow @safe @nogc shared {
return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
}

bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe {
bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe shared {
ulong oldState;
do {
goto load_state;
Expand All @@ -272,7 +271,7 @@ public:
cb.next_.prev_ = &cb.next_;
}
() @trusted { cb.prev_ = &head_; } ();
head_ = cb;
() @trusted { head_ = cast(shared)cb; } ();

if (incrementRefCountIfSuccessful) {
unlock_and_increment_token_ref_count();
Expand All @@ -285,7 +284,7 @@ public:
return true;
}

void remove_callback(StopCallback cb) nothrow @safe @nogc {
void remove_callback(StopCallback cb) nothrow @safe @nogc shared {
lock();

if (cb.prev_ !is null) {
Expand All @@ -306,7 +305,7 @@ public:
// Callback has either already executed or is executing
// concurrently on another thread.

if (signallingThread_ is Thread.getThis()) {
if (signallingThread_ is getThis()) {
// Callback executed on this thread or is still currently executing
// and is deregistering itself from within the callback.
if (cb.isRemoved_ !is null) {
Expand Down Expand Up @@ -342,7 +341,11 @@ private:
return is_stop_requested(state) || (state >= source_ref_increment);
}

bool try_lock_and_signal_until_signalled() nothrow @safe @nogc {
shared(Thread) getThis() @trusted @nogc nothrow shared {
return cast(shared)Thread.getThis();
}

bool try_lock_and_signal_until_signalled() nothrow @safe @nogc shared {
ulong oldState;
do {
oldState = state_.atomicLoad!(MemoryOrder.acq);
Expand All @@ -360,7 +363,7 @@ private:
return true;
}

void lock() nothrow @safe @nogc {
void lock() nothrow @safe @nogc shared {
ulong oldState;
do {
oldState = state_.atomicLoad!(MemoryOrder.raw);
Expand All @@ -373,19 +376,19 @@ private:
oldState | locked_flag));
}

void unlock() nothrow @safe @nogc {
void unlock() nothrow @safe @nogc shared {
// TODO: want to use atomicFetchSub but (proper) support is only recent
// state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag);
state_.atomicOp!"-="(locked_flag);
}

void unlock_and_increment_token_ref_count() nothrow @safe @nogc {
void unlock_and_increment_token_ref_count() nothrow @safe @nogc shared {
// TODO: want to use atomicFetchSub but (proper) support is only recent
// state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment);
state_.atomicOp!"-="(locked_flag - token_ref_increment);
}

void unlock_and_decrement_token_ref_count() nothrow @safe @nogc {
void unlock_and_decrement_token_ref_count() nothrow @safe @nogc shared {
// TODO: want to use atomicFetchSub but (proper) support is only recent
// state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment);
state_.atomicOp!"-="(locked_flag + token_ref_increment);
Expand Down
27 changes: 27 additions & 0 deletions tests/ut/concurrency/asyncscope.d
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,30 @@ auto waitingTask() {
s.spawn(VoidSender().withScheduler(localThreadScheduler));
s.cleanup.syncWait.assumeOk;
}

@("onComplete.inline")
@safe unittest {
auto s = asyncScope();

s.stop();
s.onComplete().syncWait.assumeOk;
}

@("onComplete.wait")
@safe unittest {
import concurrency.thread : ThreadSender;
import concurrency.stoptoken : StopSource;
import concurrency.operations : then;

auto source = new shared StopSource();
auto s = asyncScope(source);

s.spawn(ThreadSender().then(() shared @trusted {
import core.thread : Thread;
import core.time : msecs;
Thread.sleep(10.msecs);
source.stop();
}));

s.onComplete().syncWait.assumeOk;
}