Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent re-entrant execution of finalizers #10602

Merged
merged 8 commits into from
Jul 22, 2024
3 changes: 3 additions & 0 deletions build/build/src/engine/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ impl RunContext {
// sbt:warning: java.lang.ClassNotFoundException:
// org.enso.interpreter.node.expression.builtin.bool.True
ide_ci::fs::remove_if_exists(&self.paths.repo_root.engine.runtime.target)?;
// cleanup distribution from previous build
// it is fast to assemble it again
ide_ci::fs::remove_if_exists(&self.paths.repo_root.built_distribution)?;
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved

// We want to start this earlier, and await only before Engine build starts.
let perhaps_generate_java_from_rust_job =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class RuntimeManagementTest extends InterpreterTest {
"""import Standard.Base.Runtime.Thread
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|foo x =
| if x == 0 then IO.println "Start." else Nothing
Expand Down Expand Up @@ -84,8 +85,8 @@ class RuntimeManagementTest extends InterpreterTest {
if (round % 10 == 0) {
forceGC();
}
val res = eval("main a b = a * b").execute(7, 6)
assertResult(42)(res.asInt)
val res = eval("main a b = a + b").execute("Hello", "Enso")
assertResult("HelloEnso")(res.asString)
Thread.sleep(100)
totalOut ++= consumeOut
}
Expand Down Expand Up @@ -130,6 +131,7 @@ class RuntimeManagementTest extends InterpreterTest {
|from Standard.Base.Runtime.Resource import Managed_Resource
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|type Mock_File
| Value i
Expand Down Expand Up @@ -164,6 +166,7 @@ class RuntimeManagementTest extends InterpreterTest {
|from Standard.Base.Runtime.Resource import Managed_Resource
|import Standard.Base.IO
|import Standard.Base.Nothing
|import Standard.Base.Data.Numbers.Number
|
|type Mock_File
| Value i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.enso.interpreter.runtime.data.ManagedResource;

/** Allows the context to attach garbage collection hooks on the removal of certain objects. */
public class ResourceManager {
public final class ResourceManager {
private final EnsoContext context;
private volatile boolean isClosed = false;
private volatile Thread workerThread;
private final Runner worker = new Runner();
private final ProcessItems worker = new ProcessItems();
private final ReferenceQueue<ManagedResource> referenceQueue = new ReferenceQueue<>();
private final ConcurrentMap<PhantomReference<ManagedResource>, Item> items =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -98,21 +98,8 @@ private void scheduleFinalizationAtSafepoint(Item it) {
// no further attempts are made.
boolean continueFinalizing = it.isFlaggedForFinalization().compareAndSet(true, false);
if (continueFinalizing) {
var futureToCancel = new AtomicReference<Future<Void>>(null);
var performFinalizeNow =
new ThreadLocalAction(false, false, true) {
@Override
protected void perform(ThreadLocalAction.Access access) {
var tmp = futureToCancel.getAndSet(null);
if (tmp == null) {
return;
}
tmp.cancel(false);
it.finalizeNow(context);
items.remove(it);
}
};
futureToCancel.set(context.submitThreadLocal(null, performFinalizeNow));
it.finalizeNow(context);
items.remove(it);
}
}
}
Expand Down Expand Up @@ -174,12 +161,66 @@ public void shutdown() {
}

/**
* The worker action for the underlying logic of this module. At least one such thread must be
* spawned in order for this module to be operational.
* Processes {@link Item}s eligible for GC. Plays two roles. First of all cleans {@link
* #referenceQueue} in {@link #run()} method running in its own thread. Then it invokes finalizers
* in {@link #perform} method inside of Enso execution context.
*/
private class Runner implements Runnable {
private final class ProcessItems extends ThreadLocalAction implements Runnable {
/**
* @GuardedBy("pendingItems")
*/
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
private final List<Item> pendingItems = new ArrayList<>();

private volatile boolean killed = false;

ProcessItems() {
super(false, false, true);
}

/**
* Runs at a safe point in middle of regular Enso program execution. Gathers all available
* {@link #pendingItems} and runs their finalizers. Removes all processed items from {@link
* #pendingItems}. If there are any remaining, continues processing them. Otherwise finishes.
*
* @param access not used for anything
*/
@Override
protected void perform(ThreadLocalAction.Access access) {
for (; ; ) {
Item[] toProcess;
synchronized (pendingItems) {
if (pendingItems.isEmpty() || pendingItems.get(0) == null) {
// nothing to process or already processing
// avoids re-entrant calls into this method
return;
}
toProcess = pendingItems.toArray(Item[]::new);
// mark as being processed
pendingItems.set(0, null);
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
}
try {
for (var it : toProcess) {
scheduleFinalizationAtSafepoint(it);
}
} finally {
synchronized (pendingItems) {
assert pendingItems.size() >= toProcess.length
: "Just processed "
+ toProcess.length
+ " but there is only "
+ pendingItems.size()
+ " to clear";
pendingItems.subList(0, toProcess.length).clear();
}
}
}
}

/**
* Running in its own thread. Waiting for {@link #referenceQueue} to be populated with GCed
* items. Scheduling {@link #perform} action at safe points while passing the {@link Item}s to
* it via {@link #pendingItems}.
*/
@Override
public void run() {
while (true) {
Expand All @@ -188,7 +229,12 @@ public void run() {
if (!killed) {
if (ref instanceof Item it) {
it.isFlaggedForFinalization().set(true);
scheduleFinalizationAtSafepoint(it);
synchronized (pendingItems) {
if (pendingItems.isEmpty()) {
context.submitThreadLocal(null, this);
}
pendingItems.add(it);
}
}
}
if (killed) {
Expand All @@ -209,13 +255,13 @@ public void run() {
*
* @param killed whether the thread should stop execution upon reading the flag.
*/
public void setKilled(boolean killed) {
void setKilled(boolean killed) {
this.killed = killed;
}
}

/** A storage representation of a finalizable object handled by this system. */
private static class Item extends PhantomReference<ManagedResource> {
private static final class Item extends PhantomReference<ManagedResource> {
private final Object underlying;
private final Object finalizer;
private final AtomicInteger parkedCount = new AtomicInteger();
Expand All @@ -229,7 +275,7 @@ private static class Item extends PhantomReference<ManagedResource> {
* @param reference a phantom reference used for tracking the reachability status of the
* resource.
*/
public Item(
private Item(
ManagedResource referent,
Object underlying,
Object finalizer,
Expand All @@ -245,7 +291,7 @@ public Item(
*
* @param context current execution context
*/
public void finalizeNow(EnsoContext context) {
private void finalizeNow(EnsoContext context) {
try {
InteropLibrary.getUncached(finalizer).execute(finalizer, underlying);
} catch (Exception e) {
Expand All @@ -259,7 +305,7 @@ public void finalizeNow(EnsoContext context) {
*
* @return the parking actions counter
*/
public AtomicInteger getParkedCount() {
private AtomicInteger getParkedCount() {
return parkedCount;
}

Expand All @@ -271,7 +317,7 @@ public AtomicInteger getParkedCount() {
*
* @return the finalization flag
*/
public AtomicBoolean isFlaggedForFinalization() {
private AtomicBoolean isFlaggedForFinalization() {
return flaggedForFinalization;
}
}
Expand Down
41 changes: 41 additions & 0 deletions test/Base_Tests/src/Runtime/GC_Example.enso
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from Standard.Base import all
import Standard.Base.Runtime.Managed_Resource.Managed_Resource
import Standard.Base.Runtime.Ref.Ref

type My_Resource
Value counter:Ref

close self =
self.counter.modify (x-> x-1)
Nothing

allocate counter:Ref =
counter.modify (+1)
Managed_Resource.register (My_Resource.Value counter) close_resource

close_resource resource = resource.close

repeat_cleanup_until_done counter =
go i =
if counter.get == 0 then Nothing else
if i % 100 == 0 then
IO.println "Still "+counter.get.to_text+" resources to clean up..."
Runtime.gc
@Tail_Call go i+1
go 1

perform_test n:Integer println =

counter = Ref.new 0
println "Allocating "+n.to_text+" resources..."
0.up_to n . each _->
My_Resource.allocate counter

println "Cleaning up..."
repeat_cleanup_until_done counter
println "All cleaned up! Remaining: "+counter.get.to_text
counter.get

main n=1000000 =
perform_test n IO.println

5 changes: 5 additions & 0 deletions test/Base_Tests/src/Runtime/Managed_Resource_Spec.enso
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ from Standard.Base import all
import Standard.Base.Data.Vector.Builder
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.Runtime.Managed_Resource.Managed_Resource
import project.Runtime.GC_Example

from Standard.Test import all

Expand Down Expand Up @@ -57,6 +58,10 @@ add_specs suite_builder = suite_builder.group "Managed_Resource" group_builder->
r_3 = Panic.recover Any <| Managed_Resource.bracket 42 (_-> Nothing) (_-> Panic.throw "action")
r_3.catch . should_equal "action"

group_builder.specify "allocate lots of resources at once" <|
Copy link
Member Author

@JaroslavTulach JaroslavTulach Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was disabled by

Attempt to diagnose what the problem was is at

remaining = GC_Example.perform_test 100000 (_->Nothing)
remaining . should_equal 0

main filter=Nothing =
suite = Test.build suite_builder->
add_specs suite_builder
Expand Down
Loading