Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent re-entrant execution of finalizers #10602

Merged
merged 8 commits into from
Jul 22, 2024
3 changes: 3 additions & 0 deletions build/build/src/engine/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ impl RunContext {
// sbt:warning: java.lang.ClassNotFoundException:
// org.enso.interpreter.node.expression.builtin.bool.True
ide_ci::fs::remove_if_exists(&self.paths.repo_root.engine.runtime.target)?;
// cleanup distribution from previous build
// it is fast to assemble it again
ide_ci::fs::remove_if_exists(&self.paths.repo_root.built_distribution)?;
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved

// We want to start this earlier, and await only before Engine build starts.
let perhaps_generate_java_from_rust_job =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class RuntimeManagementTest extends InterpreterTest {
"""import Standard.Base.Runtime.Thread
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|foo x =
| if x == 0 then IO.println "Start." else Nothing
Expand Down Expand Up @@ -84,8 +85,8 @@ class RuntimeManagementTest extends InterpreterTest {
if (round % 10 == 0) {
forceGC();
}
val res = eval("main a b = a * b").execute(7, 6)
assertResult(42)(res.asInt)
val res = eval("main a b = a + b").execute("Hello", "Enso")
assertResult("HelloEnso")(res.asString)
Thread.sleep(100)
totalOut ++= consumeOut
}
Expand Down Expand Up @@ -130,6 +131,7 @@ class RuntimeManagementTest extends InterpreterTest {
|from Standard.Base.Runtime.Resource import Managed_Resource
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|type Mock_File
| Value i
Expand Down Expand Up @@ -164,6 +166,7 @@ class RuntimeManagementTest extends InterpreterTest {
|from Standard.Base.Runtime.Resource import Managed_Resource
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|type Mock_File
| Value i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,19 @@ private Object attachWarnings(Object vector, Set<String> warnings) {
}

private Object attachDifferentComparatorsWarning(Object vector, List<Group> groups) {
var diffCompsMsg =
groups.stream()
.map(Group::comparator)
.map(comparator -> comparator.getQualifiedName().toString())
.collect(Collectors.joining(", "));
var text = Text.create("Different comparators: [" + diffCompsMsg + "]");
var ctx = EnsoContext.get(this);
var warn = Warning.create(ctx, text, this);
return WithWarnings.appendTo(ctx, vector, false, warn);
if (groups.size() > 1) {
var diffCompsMsg =
groups.stream()
.map(Group::comparator)
.map(comparator -> comparator.getQualifiedName().toString())
.collect(Collectors.joining(", "));
var text = Text.create("Different comparators: [" + diffCompsMsg + "]");
var ctx = EnsoContext.get(this);
var warn = Warning.create(ctx, text, this);
return WithWarnings.appendTo(ctx, vector, false, warn);
} else {
return vector;
}
}

private String getDefaultComparatorQualifiedName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,21 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.enso.interpreter.runtime.data.ManagedResource;

/** Allows the context to attach garbage collection hooks on the removal of certain objects. */
public class ResourceManager {
public final class ResourceManager {
private final EnsoContext context;
private volatile boolean isClosed = false;
private volatile Thread workerThread;
private final Runner worker = new Runner();
private final ProcessItems worker = new ProcessItems();
private final ReferenceQueue<ManagedResource> referenceQueue = new ReferenceQueue<>();
private final ConcurrentMap<PhantomReference<ManagedResource>, Item> items =
new ConcurrentHashMap<>();
Expand All @@ -39,10 +40,9 @@ public ResourceManager(EnsoContext context) {
*
* @param resource the resource to park.
*/
@CompilerDirectives.TruffleBoundary
public void park(ManagedResource resource) {
if (resource.getPhantomReference() instanceof Item it) {
it.getParkedCount().incrementAndGet();
it.park();
}
}

Expand All @@ -52,14 +52,19 @@ public void park(ManagedResource resource) {
*
* @param resource the resource to unpark.
*/
@CompilerDirectives.TruffleBoundary
public void unpark(ManagedResource resource) {
if (resource.getPhantomReference() instanceof Item it) {
it.getParkedCount().decrementAndGet();
scheduleFinalizationAtSafepoint(it);
if (it.unpark(context)) {
removeFromItems(it);
}
}
}

@CompilerDirectives.TruffleBoundary
private Item removeFromItems(PhantomReference<ManagedResource> it) {
return items.remove(it);
}

/**
* Manually and unconditionally finalizes the resource. Ignores the parking mechanism, assuming
* the user now has full control over the resource.
Expand All @@ -69,7 +74,7 @@ public void unpark(ManagedResource resource) {
@CompilerDirectives.TruffleBoundary
public void close(ManagedResource resource) {
if (resource.getPhantomReference() instanceof Item it) {
items.remove(it);
removeFromItems(it);
// Unconditional finalization – user controls the resource manually.
it.finalizeNow(context);
}
Expand All @@ -83,39 +88,7 @@ public void close(ManagedResource resource) {
*/
@CompilerDirectives.TruffleBoundary
public void take(ManagedResource resource) {
items.remove(resource.getPhantomReference());
}

private void scheduleFinalizationAtSafepoint(Item it) {
if (it.isFlaggedForFinalization().get()) {
if (it.getParkedCount().get() == 0) {
// We already know that isFlaggedForFinalization was true at some
// point and there are no other threads still parking the underlying
// value. Note that it is impossible for parked count to increase after
// the value is flagged for finalization, as parking the value requires
// a live reference. We need to check if another thread didn't reach
// here earlier to perform the finalization and reset the flag, so that
// no further attempts are made.
boolean continueFinalizing = it.isFlaggedForFinalization().compareAndSet(true, false);
if (continueFinalizing) {
var futureToCancel = new AtomicReference<Future<Void>>(null);
var performFinalizeNow =
new ThreadLocalAction(false, false, true) {
@Override
protected void perform(ThreadLocalAction.Access access) {
var tmp = futureToCancel.getAndSet(null);
if (tmp == null) {
return;
}
tmp.cancel(false);
it.finalizeNow(context);
items.remove(it);
}
};
futureToCancel.set(context.submitThreadLocal(null, performFinalizeNow));
}
}
}
removeFromItems(resource.getPhantomReference());
}

/**
Expand Down Expand Up @@ -165,7 +138,7 @@ public void shutdown() {
}
}
for (PhantomReference<ManagedResource> key : items.keySet()) {
Item it = items.remove(key);
Item it = removeFromItems(key);
if (it != null) {
// Finalize unconditionally – all other threads are dead by now.
it.finalizeNow(context);
Expand All @@ -174,21 +147,82 @@ public void shutdown() {
}

/**
* The worker action for the underlying logic of this module. At least one such thread must be
* spawned in order for this module to be operational.
* Processes {@link Item}s eligible for GC. Plays two roles. First of all cleans {@link
* #referenceQueue} in {@link #run()} method running in its own thread. Then it invokes finalizers
* in {@link #perform} method inside of Enso execution context.
*/
private class Runner implements Runnable {
private final class ProcessItems extends ThreadLocalAction implements Runnable {
/**
* @GuardedBy("pendingItems")
*/
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
private final List<Item> pendingItems = new ArrayList<>();

/**
* @GuardedBy("pendingItems")
*/
private Future<Void> request;

private volatile boolean killed = false;

ProcessItems() {
super(false, false, true);
}

/**
* Runs at a safe point in middle of regular Enso program execution. Gathers all available
* {@link #pendingItems} and runs their finalizers. Removes all processed items from {@link
* #pendingItems}. If there are any remaining, continues processing them. Otherwise finishes.
*
* @param access not used for anything
*/
@Override
protected void perform(ThreadLocalAction.Access access) {
for (; ; ) {
Item[] toProcess;
synchronized (pendingItems) {
request.cancel(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request is guaranteed to be non-null at this point?

Copy link
Member Author

@JaroslavTulach JaroslavTulach Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be non-null. To get into this process method, a call to submitThreadLocal must be made and it assigns the request.

The request is only assigned back to null in this method, just before return - after this check.

E.g. unless there is some re-entrant invocation of the process method (it was there, but I hopefully fixed it), request shall not be null at this point.

Copy link
Collaborator

@hubertp hubertp Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can see that gets set there along with adding to pendingItems. But it wasn't obvious that perform can't be called with an empty pendingItems and then it would crash.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we maybe include an assert at least?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a difference between NullPointerException and AssertionError?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually the NPE can be delayed and happen somewhere down the line, making debugging harder.

In this case I guess you are right - no meaningful difference. I just don't like NPEs so that was by habit :)

if (pendingItems.isEmpty()) {
// nothing to process,
// signal request is finished
request = null;
return;
}
toProcess = pendingItems.toArray(Item[]::new);
// mark as being processed
pendingItems.set(0, null);
}
try {
for (var it : toProcess) {
it.finalizeNow(context);
removeFromItems(it);
}
} finally {
synchronized (pendingItems) {
pendingItems.subList(0, toProcess.length).clear();
}
}
}
}

/**
* Running in its own thread. Waiting for {@link #referenceQueue} to be populated with GCed
* items. Scheduling {@link #perform} action at safe points while passing the {@link Item}s to
* it via {@link #pendingItems}.
*/
@Override
public void run() {
while (true) {
try {
Reference<? extends ManagedResource> ref = referenceQueue.remove();
if (!killed) {
if (ref instanceof Item it) {
it.isFlaggedForFinalization().set(true);
scheduleFinalizationAtSafepoint(it);
it.flaggedForFinalization.set(true);
synchronized (pendingItems) {
if (request == null) {
request = context.submitThreadLocal(null, this);
Copy link
Member Author

@JaroslavTulach JaroslavTulach Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the submitThreadLocal javadoc.

If the threads array is null then the thread local action will be performed on all alive threads

Right now, when we run single threaded, one thread will pick the action. In the future, multiple threads may execute the perform method. In such situation the request may actually become null for some (slower) threads.

Using recurring events should be preferred

The ProcessItems constructor marks the ThreadLocalAction as recurring to make sure some thread will pick our action up.

ThreadLocalAction javadoc is also available.

Asynchronous thread-local actions might start and complete to perform independently of each other.

Yes, we want asynchronous action, as we only want to run the action on a single thread. We don't care about others.

Copy link
Member Author

@JaroslavTulach JaroslavTulach Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continues at the next PR.

}
pendingItems.add(it);
}
}
}
if (killed) {
Expand All @@ -209,16 +243,32 @@ public void run() {
*
* @param killed whether the thread should stop execution upon reading the flag.
*/
public void setKilled(boolean killed) {
void setKilled(boolean killed) {
this.killed = killed;
}
}

/** A storage representation of a finalizable object handled by this system. */
private static class Item extends PhantomReference<ManagedResource> {
private static final class Item extends PhantomReference<ManagedResource> {
private final Object underlying;
private final Object finalizer;

/**
* Returns the counter of actions parking this object. The object can be safely finalized only
* if it's unreachable {@link #isFlaggedForFinalization()} and this counter is zero.
*
* @return the parking actions counter
*/
private final AtomicInteger parkedCount = new AtomicInteger();

/**
* Returns the boolean representing finalization status of this object. The object should be
* removed by the first thread that observes this flag to be set to true and the {@link
* #getParkedCount()} to be zero. If a thread intends to perform the finalization, it should set
* this flag to {@code false}.
*
* @return the finalization flag
*/
private final AtomicBoolean flaggedForFinalization = new AtomicBoolean();

/**
Expand All @@ -229,7 +279,7 @@ private static class Item extends PhantomReference<ManagedResource> {
* @param reference a phantom reference used for tracking the reachability status of the
* resource.
*/
public Item(
private Item(
ManagedResource referent,
Object underlying,
Object finalizer,
Expand All @@ -245,34 +295,31 @@ public Item(
*
* @param context current execution context
*/
public void finalizeNow(EnsoContext context) {
@CompilerDirectives.TruffleBoundary
private void finalizeNow(EnsoContext context) {
try {
InteropLibrary.getUncached(finalizer).execute(finalizer, underlying);
} catch (Exception e) {
context.getErr().println("Exception in finalizer: " + e.getMessage());
}
}

/**
* Returns the counter of actions parking this object. The object can be safely finalized only
* if it's unreachable {@link #isFlaggedForFinalization()} and this counter is zero.
*
* @return the parking actions counter
*/
public AtomicInteger getParkedCount() {
return parkedCount;
private void park() {
parkedCount.incrementAndGet();
}

/**
* Returns the boolean representing finalization status of this object. The object should be
* removed by the first thread that observes this flag to be set to true and the {@link
* #getParkedCount()} to be zero. If a thread intends to perform the finalization, it should set
* this flag to {@code false}.
*
* @return the finalization flag
* @return {@code true} if the finalizer was run
*/
public AtomicBoolean isFlaggedForFinalization() {
return flaggedForFinalization;
private boolean unpark(EnsoContext context) {
if (parkedCount.decrementAndGet() == 0) {
boolean continueFinalizing = flaggedForFinalization.compareAndSet(true, false);
if (continueFinalizing) {
finalizeNow(context);
return true;
}
}
return false;
}
}
}
Loading
Loading