Skip to content

Commit

Permalink
priority
Browse files Browse the repository at this point in the history
  • Loading branch information
SentryMan committed Jan 27, 2024
1 parent dbea069 commit 4391d21
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,17 @@ private void writeObserveMethods() {
.collect(joining(", "));
writer.append("e -> bean.%s(e, %s);", methodReader.name(), injectParamNames);
}
final var observesPrism = ObservesPrism.getInstanceOn(observeEvent.element());
writer
.eol()
.indent(indent)
.append(
"%s.<%s>registerObserver(%s, %s, %s, \"%s\");",
"%s.<%s>registerObserver(%s, %s, %s, %s, \"%s\");",
builder,
shortWithoutAnnotations,
ObservesPrism.getInstanceOn(observeEvent.element()).async().booleanValue(),
observeTypeString,
observesPrism.priority(),
observesPrism.async().booleanValue(),
methodReader.name(),
observeEvent.qualifier())
.eol();
Expand Down
8 changes: 2 additions & 6 deletions inject/src/main/java/io/avaje/inject/DObserverManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import io.avaje.inject.events.Observer;
import io.avaje.inject.events.ObserverManager;
Expand All @@ -15,12 +14,9 @@ class DObserverManager implements ObserverManager {
Map<Type, List<Observer<?>>> observeMap = new HashMap<>();

@Override
public <T> void registerObserver(
boolean async, Type type, Consumer<T> observer, String qualifier) {
public <T> void registerObserver(Type type, Observer<T> observer) {

observeMap
.computeIfAbsent(type, k -> new ArrayList<>())
.add(new Observer<>(async, observer, qualifier));
observeMap.computeIfAbsent(type, k -> new ArrayList<>()).add(observer);
}

@Override
Expand Down
8 changes: 5 additions & 3 deletions inject/src/main/java/io/avaje/inject/events/Event.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.avaje.inject.events;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -40,9 +41,9 @@ protected Event(List<Observer<T>> observers) {
* @param event the event object
*/
public void fire(T event, String qualifier) {
for (var observer : observers) {
observer.observe(event, qualifier);
}
observers.stream()
.sorted(Comparator.comparing(Observer::priority))
.forEach(observer -> observer.observe(event, qualifier));
}

/**
Expand All @@ -58,6 +59,7 @@ public CompletableFuture<Void> fireAsync(T event, String qualifier) {

return CompletableFuture.allOf(
observers.stream()
.sorted(Comparator.comparing(Observer::priority))
.map(o -> o.observeAsync(event, qualifier))
.toArray(CompletableFuture[]::new));
}
Expand Down
8 changes: 7 additions & 1 deletion inject/src/main/java/io/avaje/inject/events/Observer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@
*/
public class Observer<T> {

private final int priority;
private final boolean async;
private final Consumer<T> method;
private final String qualifierString;

public Observer(boolean async, Consumer<T> method, String qualifierString) {
public Observer(int priority, boolean async, Consumer<T> method, String qualifierString) {
this.priority = priority;
this.async = async;
this.method = method;
this.qualifierString = qualifierString;
}

public int priority() {
return priority;
}

void observe(T event, String qualifier) {

if (event != null && qualifierString.equalsIgnoreCase(qualifier)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.lang.reflect.Type;
import java.util.List;
import java.util.function.Consumer;

/** Manages all {@link Observer} instances in the BeanScope */
public interface ObserverManager {
Expand All @@ -16,7 +15,7 @@ public interface ObserverManager {
* @param observer the consumer to execute when a matching event is found
* @param qualifier qualifier string that this observer should be registered to
*/
<T> void registerObserver(boolean async, Type type, Consumer<T> observer, String qualifier);
<T> void registerObserver(Type eventType, Observer<T> observer);

/**
* Retrieves a list of all Observers registered by the given type
Expand Down
3 changes: 3 additions & 0 deletions inject/src/main/java/io/avaje/inject/events/Observes.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
@Retention(SOURCE)
public @interface Observes {

/** The priority of the observe method */
int priority() default 1000;

/** Whether this Observer should be exclusively executed asynchronously */
boolean async() default false;
}
4 changes: 2 additions & 2 deletions inject/src/main/java/io/avaje/inject/spi/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ static Builder newBuilder(Set<String> profiles, PropertyRequiresPlugin plugin, L
*/
<T> void registerProvider(Provider<T> provider);

/** Register the provider into the context. */
<T> void registerObserver(boolean sync, Type type, Consumer<T> observer, String qualifier);
/** Register the observer into the context. */
<T> void registerObserver(Type type, int priority, boolean sync, Consumer<T> observer, String qualifier);

/**
* Register the bean instance into the context.
Expand Down
7 changes: 4 additions & 3 deletions inject/src/main/java/io/avaje/inject/spi/DBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.avaje.inject.BeanEntry;
import io.avaje.inject.BeanScope;
import io.avaje.inject.events.Observer;
import io.avaje.inject.events.ObserverManager;
import jakarta.inject.Provider;

Expand Down Expand Up @@ -184,9 +185,9 @@ public final <T> void registerProvider(Provider<T> provider) {
}

@Override
public <T> void registerObserver(
boolean sync, Type type, Consumer<T> observer, String qualifier) {
get(ObserverManager.class).registerObserver(sync, type, observer, qualifier);
public <T> void registerObserver(Type type, int priority, boolean sync, Consumer<T> observer, String qualifier) {
get(ObserverManager.class)
.registerObserver(type, new Observer<T>(priority, sync, observer, qualifier));
}

@Override
Expand Down
21 changes: 12 additions & 9 deletions inject/src/test/java/io/avaje/inject/DObserverManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.Test;

import io.avaje.inject.events.Observer;
import io.avaje.inject.events.ObserverManager;
import io.avaje.inject.events.TestEvent;
import io.avaje.inject.events.TestGenericEvent;
Expand All @@ -23,32 +24,34 @@ void test() {
testEvent.fire("sus");

assertThat(aBoolean.get()).isFalse();
manager.<String>registerObserver(false, String.class, s -> aBoolean.set(true), "");
manager.<String>registerObserver(
String.class, new Observer<>(0, false, s -> aBoolean.set(true), ""));

testEvent.fire("sus");
assertThat(aBoolean.get()).isTrue();
}

@Test
void testRegisterAsync() throws InterruptedException {
void testAsync() throws InterruptedException {
AtomicBoolean aBoolean = new AtomicBoolean();

manager.<List<String>>registerObserver(
true, new GenericType<List<String>>() {}.type(), s -> aBoolean.set(true), "");
manager.<String>registerObserver(
String.class, new Observer<>(0, true, s -> aBoolean.set(true), ""));

new TestGenericEvent(manager).fire(List.of("str"));
Thread.sleep(200);
new TestEvent(manager).fireAsync("str");
Thread.sleep(500);
assertThat(aBoolean.get()).isTrue();
}

@Test
void testAsync() throws InterruptedException {
void testGenericAsync() throws InterruptedException {
AtomicBoolean aBoolean = new AtomicBoolean();

manager.<List<String>>registerObserver(
false, new GenericType<List<String>>() {}.type(), s -> aBoolean.set(true), "");
new GenericType<List<String>>() {}.type(),
new Observer<>(0, false, s -> aBoolean.set(true), ""));

new TestGenericEvent(manager).fireAsync(List.of("str"));
new TestGenericEvent(manager).fire(List.of("str"));
Thread.sleep(200);
assertThat(aBoolean.get()).isTrue();
}
Expand Down

0 comments on commit 4391d21

Please sign in to comment.