Skip to content

Commit

Permalink
[GR-20446] Relax Fiber#transfer limitation
Browse files Browse the repository at this point in the history
PullRequest: truffleruby/2637
  • Loading branch information
bjfish committed Nov 5, 2021
2 parents d68a14f + 0967b59 commit d59b3fb
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Compatibility:
* Implement `Proc#{==,eql?}` (#2453).
* Implement all `StringScanner` methods (#2520, @eregon).
* Handle `Kernel#clone(freeze: true)` (#2512, @andrykonchin).
* Relax `Fiber#transfer` limitations (#2453).

Performance:

Expand Down
12 changes: 12 additions & 0 deletions spec/ruby/library/fiber/resume_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
fiber2.resume
-> { fiber2.resume }.should raise_error(FiberError)
end

it "raises a FiberError if the Fiber attempts to resume a resuming fiber" do
root_fiber = Fiber.current
fiber1 = Fiber.new { root_fiber.resume }
-> { fiber1.resume }.should raise_error(FiberError, /double resume/)
end
end

ruby_version_is '3.0' do
Expand All @@ -19,5 +25,11 @@
fiber2.resume.should == 10
fiber2.resume.should == 20
end

it "raises a FiberError if the Fiber attempts to resume a resuming fiber" do
root_fiber = Fiber.current
fiber1 = Fiber.new { root_fiber.resume }
-> { fiber1.resume }.should raise_error(FiberError, /attempt to resume a resuming fiber/)
end
end
end
3 changes: 1 addition & 2 deletions spec/tags/core/fiber/resume_tags.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
fails:Fiber#resume executes the ensure clause
fails:Fiber#resume raises a FiberError if the Fiber tries to resume itself
slow:Fiber#resume executes the ensure clause
1 change: 0 additions & 1 deletion spec/tags/library/fiber/current_tags.txt

This file was deleted.

1 change: 0 additions & 1 deletion spec/tags/library/fiber/resume_tags.txt

This file was deleted.

2 changes: 0 additions & 2 deletions spec/tags/library/fiber/transfer_tags.txt

This file was deleted.

69 changes: 54 additions & 15 deletions src/main/java/org/truffleruby/core/fiber/FiberManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import java.util.concurrent.CountDownLatch;

import org.truffleruby.core.fiber.RubyFiber.FiberStatus;

import com.oracle.truffle.api.TruffleContext;
import com.oracle.truffle.api.TruffleSafepoint;
import org.truffleruby.RubyContext;
Expand Down Expand Up @@ -118,7 +120,7 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod
FiberMessage lastMessage = null;
try {
final Object[] args = handleMessage(fiber, message, currentNode);
fiber.resumed = true;
fiber.status = FiberStatus.RESUMED;
final Object result = ProcOperations.rootCall(block, args);

lastMessage = new FiberResumeMessage(FiberOperation.YIELD, fiber, new Object[]{ result });
Expand All @@ -144,7 +146,7 @@ private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Nod

// Perform all cleanup before resuming the parent Fiber
// Make sure that other fibers notice we are dead before they gain control back
fiber.alive = false;
fiber.status = FiberStatus.TERMINATED;
// Leave context before addToMessageQueue() -> parent Fiber starts executing
truffleContext.leave(currentNode, prev);
thread.threadLocalState.rubyThread = null;
Expand All @@ -161,18 +163,27 @@ public RubyFiber getReturnFiber(RubyFiber currentFiber, Node currentNode, Branch
assert currentFiber == currentFiber.rubyThread.getCurrentFiber();

final RubyFiber rootFiber = currentFiber.rubyThread.getRootFiber();
if (currentFiber == rootFiber) {
errorProfile.enter();
throw new RaiseException(context, context.getCoreExceptions().yieldFromRootFiberError(currentNode));
}

final RubyFiber parentFiber = currentFiber.lastResumedByFiber;
if (parentFiber != null) {
final RubyFiber previousFiber = currentFiber.lastResumedByFiber;
if (previousFiber != null) {
currentFiber.lastResumedByFiber = null;
return parentFiber;
previousFiber.resumingFiber = null;
return previousFiber;
} else {
return rootFiber;

if (currentFiber == rootFiber) {
errorProfile.enter();
throw new RaiseException(context, context.getCoreExceptions().yieldFromRootFiberError(currentNode));
}

RubyFiber fiber = rootFiber;
while (fiber.resumingFiber != null) {
fiber = fiber.resumingFiber;
}
return fiber;
}


}

@TruffleBoundary
Expand Down Expand Up @@ -226,6 +237,12 @@ private Object[] handleMessage(RubyFiber fiber, FiberMessage message, Node curre
assert language.getCurrentThread() == resumeMessage.getSendingFiber().rubyThread;
final FiberOperation operation = resumeMessage.getOperation();

if (operation == FiberOperation.RESUME) {
fiber.yielding = false;
}
fiber.status = FiberStatus.RESUMED;


if (operation == FiberOperation.RESUME || operation == FiberOperation.RAISE) {
fiber.lastResumedByFiber = resumeMessage.getSendingFiber();
}
Expand All @@ -247,20 +264,42 @@ private void resume(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operati
}

@TruffleBoundary
public Object[] transferControlTo(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operation, Object[] args,
public Object[] transferControlTo(RubyFiber fromFiber, RubyFiber toFiber, FiberOperation operation, Object[] args,
Node currentNode) {
final FiberMessage message = resumeAndWait(fromFiber, fiber, operation, args, currentNode);
assert fromFiber.resumingFiber == null;
if (operation == FiberOperation.RESUME) {
fromFiber.resumingFiber = toFiber;
}

assert !fromFiber.yielding;
if (operation == FiberOperation.YIELD) {
fromFiber.yielding = true;
}

if (fromFiber.status == FiberStatus.RESUMED) {
fromFiber.status = FiberStatus.SUSPENDED;
}
final FiberMessage message = resumeAndWait(fromFiber, toFiber, operation, args, currentNode);
return handleMessage(fromFiber, message, currentNode);
}

/** This methods switches from the currently-running fromFiber to toFiber. This method notifies toFiber to start
* executing again and then just after suspends fromFiber. We only return from this method call for fromFiber once
* control is passed back to fromFiber. As soon as toFiber is notified, it pops the message from the queue in
* waitMessage() and then calls handleMessage(). There must be no code between notifying toFiber and suspending
* fromFiber, as during that time both threads can be running, yet this does not matter semantically since fromFiber
* will suspend and nothing happens fromFiber until then.
*
* @param fromFiber the current fiber which will soon be suspended
* @param toFiber the fiber we resume or transfer to */
@TruffleBoundary
private FiberMessage resumeAndWait(RubyFiber fromFiber, RubyFiber fiber, FiberOperation operation, Object[] args,
private FiberMessage resumeAndWait(RubyFiber fromFiber, RubyFiber toFiber, FiberOperation operation, Object[] args,
Node currentNode) {
final TruffleContext truffleContext = context.getEnv().getContext();
final FiberMessage message = context
.getThreadManager()
.leaveAndEnter(truffleContext, currentNode, () -> {
resume(fromFiber, fiber, operation, args);
resume(fromFiber, toFiber, operation, args);
return waitMessage(fromFiber, currentNode);
});
fromFiber.rubyThread.setCurrentFiber(fromFiber);
Expand Down Expand Up @@ -306,7 +345,7 @@ public void cleanup(RubyFiber fiber, Thread javaThread) {

context.getValueWrapperManager().cleanup(context, fiber.handleData);

fiber.alive = false;
fiber.status = FiberStatus.TERMINATED;

threadManager.cleanupValuesForJavaThread(javaThread);

Expand Down
115 changes: 90 additions & 25 deletions src/main/java/org/truffleruby/core/fiber/FiberNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.truffleruby.core.encoding.Encodings;
import org.truffleruby.core.exception.RubyException;
import org.truffleruby.core.fiber.FiberNodesFactory.FiberTransferNodeFactory;
import org.truffleruby.core.fiber.RubyFiber.FiberStatus;
import org.truffleruby.core.klass.RubyClass;
import org.truffleruby.core.proc.RubyProc;
import org.truffleruby.core.rope.CodeRange;
Expand All @@ -45,6 +46,10 @@ public abstract class FiberNodes {

public abstract static class FiberTransferNode extends CoreMethodArrayArgumentsNode {

public static FiberTransferNode create() {
return FiberTransferNodeFactory.create(null);
}

@Child private SingleValueCastNode singleValueCastNode;

public Object singleValue(Object[] args) {
Expand All @@ -67,7 +72,7 @@ protected Object transfer(
Object[] args,
@Cached BranchProfile errorProfile) {

if (!fiber.alive) {
if (fiber.isTerminated()) {
errorProfile.enter();
throw new RaiseException(getContext(), coreExceptions().deadFiberCalledError(this));
}
Expand Down Expand Up @@ -130,21 +135,32 @@ public abstract static class TransferNode extends CoreMethodArrayArgumentsNode {
@Child private FiberTransferNode fiberTransferNode = FiberTransferNodeFactory.create(null);

@Specialization
protected Object resume(RubyFiber fiber, Object[] args,
@Cached ConditionProfile sameFiberProfile) {
protected Object transfer(RubyFiber toFiber, Object[] args,
@Cached ConditionProfile sameFiberProfile,
@Cached BranchProfile errorProfile) {

fiber.transferred = true;
if (toFiber.resumingFiber != null) {
errorProfile.enter();
throw new RaiseException(getContext(), coreExceptions()
.fiberError("attempt to transfer to a resuming fiber", this));
}

if (toFiber.yielding) {
errorProfile.enter();
throw new RaiseException(getContext(), coreExceptions()
.fiberError("attempt to transfer to a yielding fiber", this));
}

final RubyThread currentThread = getLanguage().getCurrentThread();
final RubyFiber currentFiber = currentThread.getCurrentFiber();

if (sameFiberProfile.profile(currentFiber == fiber)) {
if (sameFiberProfile.profile(currentFiber == toFiber)) {
// A Fiber can transfer to itself
return fiberTransferNode.singleValue(args);
}

return fiberTransferNode
.executeTransferControlTo(currentThread, currentFiber, fiber, FiberOperation.TRANSFER, args);
.executeTransferControlTo(currentThread, currentFiber, toFiber, FiberOperation.TRANSFER, args);
}

}
Expand All @@ -161,27 +177,42 @@ public static FiberResumeNode create() {
@Child private FiberTransferNode fiberTransferNode = FiberTransferNodeFactory.create(null);

@Specialization
protected Object resume(FiberOperation operation, RubyFiber fiber, Object[] args,
@Cached ConditionProfile doubleResumeProfile,
@Cached ConditionProfile transferredProfile) {

final RubyFiber parentFiber = fiber.lastResumedByFiber;
protected Object resume(FiberOperation operation, RubyFiber toFiber, Object[] args,
@Cached BranchProfile errorProfile) {

if (doubleResumeProfile.profile(parentFiber != null || fiber.isRootFiber())) {
throw new RaiseException(getContext(), coreExceptions().fiberError("double resume", this));
}
final RubyThread currentThread = getLanguage().getCurrentThread();
final RubyFiber currentFiber = currentThread.getCurrentFiber();

if (operation != FiberOperation.RAISE && transferredProfile.profile(fiber.transferred)) {
if (toFiber.isTerminated()) {
errorProfile.enter();
throw new RaiseException(
getContext(),
coreExceptions().fiberError("attempt to resume a terminated fiber", this));
} else if (toFiber == currentFiber) {
errorProfile.enter();
throw new RaiseException(
getContext(),
coreExceptions().fiberError("cannot resume transferred Fiber", this));
coreExceptions().fiberError("attempt to resume the current fiber", this));
} else if (toFiber.lastResumedByFiber != null) {
errorProfile.enter();
throw new RaiseException(
getContext(),
coreExceptions().fiberError("attempt to resume a resumed fiber (double resume)", this));
} else if (toFiber.resumingFiber != null) {
errorProfile.enter();
throw new RaiseException(
getContext(),
coreExceptions().fiberError("attempt to resume a resuming fiber", this));
} else if (toFiber.lastResumedByFiber == null &&
(!toFiber.yielding && toFiber.status != FiberStatus.CREATED)) {
errorProfile.enter();
throw new RaiseException(
getContext(),
coreExceptions().fiberError("attempt to resume a transferring fiber", this));
}

final RubyThread currentThread = getLanguage().getCurrentThread();
final RubyFiber currentFiber = currentThread.getCurrentFiber();

return fiberTransferNode
.executeTransferControlTo(currentThread, currentFiber, fiber, operation, args);
.executeTransferControlTo(currentThread, currentFiber, toFiber, operation, args);
}

}
Expand All @@ -190,18 +221,52 @@ protected Object resume(FiberOperation operation, RubyFiber fiber, Object[] args
@Primitive(name = "fiber_raise")
public abstract static class FiberRaiseNode extends PrimitiveArrayArgumentsNode {

@Child private FiberResumeNode fiberResumeNode = FiberResumeNode.create();
@Child private FiberResumeNode fiberResumeNode;
@Child private FiberTransferNode fiberTransferNode;

@Specialization
protected Object raise(RubyFiber fiber, RubyException exception,
@Cached BranchProfile errorProfile) {
if (!fiber.resumed) {
if (fiber.resumingFiber != null) {
errorProfile.enter();
throw new RaiseException(
getContext(),
coreExceptions().fiberError("attempt to raise a resuming fiber", this));
} else if (fiber.status == FiberStatus.CREATED) {
errorProfile.enter();
throw new RaiseException(
getContext(),
coreExceptions().fiberError("cannot raise exception on unborn fiber", this));
}
return fiberResumeNode.executeResume(FiberOperation.RAISE, fiber, new Object[]{ exception });

if (fiber.status == FiberStatus.SUSPENDED && !fiber.yielding) {
final RubyThread currentThread = getLanguage().getCurrentThread();
final RubyFiber currentFiber = currentThread.getCurrentFiber();
return getTransferNode().executeTransferControlTo(
currentThread,
currentFiber,
fiber,
FiberOperation.RAISE,
new Object[]{ exception });
} else {
return getResumeNode().executeResume(FiberOperation.RAISE, fiber, new Object[]{ exception });
}
}

private FiberResumeNode getResumeNode() {
if (fiberResumeNode == null) {
CompilerDirectives.transferToInterpreterAndInvalidate();
fiberResumeNode = insert(FiberResumeNode.create());
}
return fiberResumeNode;
}

private FiberTransferNode getTransferNode() {
if (fiberTransferNode == null) {
CompilerDirectives.transferToInterpreterAndInvalidate();
fiberTransferNode = insert(FiberTransferNode.create());
}
return fiberTransferNode;
}

}
Expand Down Expand Up @@ -248,7 +313,7 @@ public abstract static class AliveNode extends UnaryCoreMethodNode {

@Specialization
protected boolean alive(RubyFiber fiber) {
return fiber.alive;
return fiber.status != FiberStatus.TERMINATED;
}

}
Expand Down Expand Up @@ -277,7 +342,7 @@ public abstract static class FiberStatusNode extends PrimitiveArrayArgumentsNode
@Specialization
protected RubyString status(RubyFiber fiber,
@Cached MakeStringNode makeStringNode) {
return makeStringNode.executeMake(fiber.getStatus(), Encodings.UTF_8, CodeRange.CR_UNKNOWN);
return makeStringNode.executeMake(fiber.status.label, Encodings.UTF_8, CodeRange.CR_UNKNOWN);
}
}

Expand Down
Loading

0 comments on commit d59b3fb

Please sign in to comment.