Skip to content

Commit

Permalink
Update Root.Controller interface.
Browse files Browse the repository at this point in the history
Change start() to accept an optional Lookup.
Add isAlive() and awaitTermination() methods.
  • Loading branch information
neilcsmith-net committed Oct 13, 2023
1 parent fd35484 commit b9d8704
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 48 deletions.
94 changes: 65 additions & 29 deletions praxiscore-api/src/main/java/org/praxislive/core/Root.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 2018 Neil C Smith.
* Copyright 2023 Neil C Smith.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3 only, as
Expand All @@ -21,77 +21,113 @@
*/
package org.praxislive.core;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Root provides the companion part of the actor-model to Component within
* Root provides the companion part of the actor-model to Component within
* PraxisCORE's forest-of-actors model. The Root implementation handles initial
* message handling and scheduling. There may be many Roots within a running
* PraxisCORE system - the Roots are sandboxed from each other and the RootHub
* handles delivery of messages (Packet / Call) from one Root to another.
*
* <p>
* A Root may be a Component or Container, but this is not required. As
* Component implementations are intended to be lock-free and single-threaded,
* the Root implementation will ensure that all messages are handled serially.
* Some Root implementations will have a one-to-one relationship to a thread of
* execution.
*
*
*
*
*/
public interface Root extends Lookup.Provider {
public interface Root extends Lookup.Provider {

/**
* Method used by the RootHub to initialize the Root and obtain a Controller.
* Root implementations will ensure this method can only be invoked once.
*
* Method used by the RootHub to initialize the Root and obtain a
* Controller. Root implementations will ensure this method can only be
* invoked once.
*
* @param ID the unique ID of this Root
* @param hub the RootHub the Root resides within
* @return Controller for use by the RootHub instance
* @throws IllegalStateException if the Root has already been initialized
*/
public Root.Controller initialize(String ID, RootHub hub);

/**
* An interface used by the RootHub to control the lifecycle of, and
* communicate with, the Root.
*/
public interface Controller {

/**
* Deliver a Packet to this Root. This method is intended to be called
* from a thread other than the primary thread of the Root. It will add
* the packet to a queue and return immediately - this method will never
* block as it may be called from the thread of another Root.
*
* <p>
* This method will return true if the Packet can be handled (see eg.
* BlockingQueue::offer)
*
*
* @param packet message (see Packet / Call) to handle
* @return true if the packet can be handled
*/
public boolean submitPacket(Packet packet);

/**
* Start the Root. If the Root implementation requires a primary thread
* to run on it will use the supplied ThreadFactory so that the RootHub
* can manage thread creation. The ThreadFactory is not required to
* support the creation of more than one Thread.
*
* Controller implementations will ensure that this method can only be
* invoked once.
*
* @param threadFactory used if the Root requires a thread to run on.
* Start the Root. Controller implementations will ensure that this
* method can only be invoked once.
* <p>
* The lookup may contain services that the root may utilise - eg. a
* shared {@link ScheduledExecutorService}. This allows services to be
* controlled on a per-root basis, rather than relying on the hub
* lookup.
*
* @param lookup optional services for the root to use
* @throws IllegalStateException if the Root has already been started.
*/
public void start(ThreadFactory threadFactory);

public void start(Lookup lookup);

/**
* Convenience method to call {@link #start(org.praxislive.core.Lookup)}
* with an empty lookup.
*/
public default void start() {
start(Lookup.EMPTY);
}

/**
* Signal the Root to be shutdown. This method is intended to be called
* asynchronously and will return immediately - it will not wait for the
* Root to actually complete execution.
*/
public void shutdown();


/**
* Query whether the Root is alive - has been started and has not been
* signalled to shutdown. This method may return false even if the Root
* is still in the process of termination. If the caller needs to know
* when the Root has finished termination, use
* {@link #awaitTermination(long, java.util.concurrent.TimeUnit)}.
*
* @return true if started and not signalled to shutdown
*/
public boolean isAlive();

/**
* Wait for the Root to fully terminate, freeing up all resources.
*
* @param timeout maximum time to wait
* @param unit unit of timeout
* @throws InterruptedException if the current thread is interrupted
* @throws TimeoutException if the timeout is reached before termination
* @throws ExecutionException if the termination is the result of an
* exception
*/
public void awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -574,6 +577,7 @@ protected static DelegateConfiguration delegateConfig() {
protected class Controller implements Root.Controller {

private final AtomicBoolean updateQueued = new AtomicBoolean();
private final CompletableFuture<Void> activeFuture = new CompletableFuture<>();

private ScheduledExecutorService exec;
private ScheduledFuture<?> updateTask;
Expand All @@ -590,10 +594,11 @@ public boolean submitPacket(Packet packet) {
}

@Override
public void start(ThreadFactory threadFactory) {
public void start(Lookup lookup) {
if (state.compareAndSet(State.INITIALIZED, State.ACTIVE_IDLE)) {
this.threadFactory = threadFactory;
this.exec = hub.getLookup()
String threadID = getAddress().rootID();
threadFactory = r -> new Thread(r, threadID);
this.exec = lookup
.find(ScheduledExecutorService.class)
.orElse(null);
if (exec == null) {
Expand All @@ -612,6 +617,18 @@ public void shutdown() {
? State.TERMINATED : State.TERMINATING);
}

@Override
public boolean isAlive() {
var s = state.get();
return s == State.ACTIVE_RUNNING || s == State.ACTIVE_IDLE;
}

@Override
public void awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException, TimeoutException, ExecutionException {
activeFuture.get(timeout, unit);
}

/**
* Called on receipt of a {@link Packet} (Call) or a Runnable task. The
* default implementation will call {@link Delegate#onQueueReceipt()} if
Expand Down Expand Up @@ -687,8 +704,10 @@ private void doTerminate() {
}
context.updateState(hub.getClock().getTime(), ExecutionContext.State.TERMINATED);
if (ownsScheduler) {
// we're running in scheduler so cannot wait for it!
exec.shutdown();
}
activeFuture.complete(null);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testProcessCall() {
RootImpl root = new RootImpl();
LinkedBlockingQueue<Packet> responseQueue = new LinkedBlockingQueue<>();
RootHubImpl hub = new RootHubImpl(root, responseQueue);
hub.ctrl.start(Thread::new);
hub.ctrl.start();
hub.ctrl.submitPacket(Call.create(ControlAddress.of("/test.hello"),
ControlAddress.of("/hub.world"),
hub.getClock().getTime() + TimeUnit.MILLISECONDS.toNanos(100)));
Expand All @@ -96,7 +96,7 @@ public void testDelegateProcessCall() {
DelegatingRootImpl root = new DelegatingRootImpl();
LinkedBlockingQueue<Packet> responseQueue = new LinkedBlockingQueue<>();
RootHubImpl hub = new RootHubImpl(root, responseQueue);
hub.ctrl.start(Thread::new);
hub.ctrl.start();
hub.ctrl.submitPacket(Call.create(ControlAddress.of("/test.hello"),
ControlAddress.of("/hub.world"),
hub.getClock().getTime() + TimeUnit.MILLISECONDS.toNanos(100)));
Expand All @@ -119,7 +119,7 @@ public void testCallResponseAfterShutdown() {
RootImpl root = new RootImpl();
LinkedBlockingQueue<Packet> responseQueue = new LinkedBlockingQueue<>();
RootHubImpl hub = new RootHubImpl(root, responseQueue);
hub.ctrl.start(Thread::new);
hub.ctrl.start();
hub.ctrl.submitPacket(Call.create(ControlAddress.of("/test.hello"),
ControlAddress.of("/hub.world"),
hub.getClock().getTime() + TimeUnit.SECONDS.toNanos(1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.praxislive.core.Root;
import org.praxislive.core.RootHub;
import org.praxislive.core.Control;
import org.praxislive.core.Lookup;
import org.praxislive.core.PacketRouter;
import org.praxislive.core.Value;
import org.praxislive.core.services.RootFactoryService;
Expand Down Expand Up @@ -253,7 +254,7 @@ protected final Root.Controller uninstallRoot(String id) {
* @param ctrl root controller
*/
protected final void startRoot(final String id, final Root.Controller ctrl) {
ctrl.start(r -> new Thread(r, id));
ctrl.start();
}

/**
Expand Down
28 changes: 16 additions & 12 deletions praxiscore-hub/src/main/java/org/praxislive/hub/Hub.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -67,7 +68,6 @@ public final class Hub {
private final RootHubImpl rootHub;
private final List<String> rootIDs;

private Thread coreThread;
private Root.Controller coreController;
long startTime;

Expand Down Expand Up @@ -104,18 +104,14 @@ private void extractExtensions(Builder builder, List<Root> exts) {
* @throws Exception if start fails or the hub has already been started
*/
public synchronized void start() throws Exception {
if (coreThread != null) {
if (coreController != null) {
throw new IllegalStateException();
}
startTime = System.nanoTime();
String coreID = CORE_PREFIX + Integer.toHexString(core.hashCode());
coreController = core.initialize(coreID, rootHub);
roots.put(coreID, coreController);
coreController.start(r -> {
coreThread = new Thread(r, "PRAXIS_CORE_THREAD");
return coreThread;
});
assert coreThread.isAlive();
coreController.start(Lookup.EMPTY);
}

/**
Expand All @@ -132,7 +128,14 @@ public void shutdown() {
* @throws InterruptedException if interrupted
*/
public void await() throws InterruptedException {
coreThread.join();
while (true) {
try {
await(1, TimeUnit.MINUTES);
return;
} catch (TimeoutException ex) {
// loop again
}
}
}

/**
Expand All @@ -144,9 +147,10 @@ public void await() throws InterruptedException {
* @throws TimeoutException if the hub has not terminated in the given time
*/
public void await(long time, TimeUnit unit) throws InterruptedException, TimeoutException {
coreThread.join(unit.toMillis(time));
if (coreThread.isAlive()) {
throw new TimeoutException();
try {
coreController.awaitTermination(time, unit);
} catch (ExecutionException ex) {
return;
}
}

Expand All @@ -156,7 +160,7 @@ public void await(long time, TimeUnit unit) throws InterruptedException, Timeout
* @return true if active
*/
public boolean isAlive() {
return coreThread.isAlive();
return coreController.isAlive();
}

/**
Expand Down

0 comments on commit b9d8704

Please sign in to comment.