From 2f20dddf4bb7bc88cea833badfb08a440afcd5e9 Mon Sep 17 00:00:00 2001 From: mercyblitz Date: Mon, 20 May 2019 15:02:07 +0800 Subject: [PATCH] Polish apache/incubator-dubbo/#4096 : To add new module for Dubbo Event --- dubbo-all/pom.xml | 9 + dubbo-event/pom.xml | 30 ++++ .../dubbo/event/AbstractEventDispatcher.java | 155 ++++++++++++++++++ .../dubbo/event/DirectEventDispatcher.java | 30 ++++ .../java/org/apache/dubbo/event/Event.java | 49 ++++++ .../apache/dubbo/event/EventDispatcher.java | 66 ++++++++ .../org/apache/dubbo/event/EventListener.java | 124 ++++++++++++++ .../org/apache/dubbo/event/GenericEvent.java | 22 ++- .../org/apache/dubbo/event/Listenable.java | 131 +++++++++++++++ .../dubbo/event/ParallelEventDispatcher.java | 32 ++++ .../org.apache.dubbo.event.EventDispatcher | 2 + .../dubbo/event/AbstractEventListener.java | 40 +++++ .../event/DirectEventDispatcherTest.java | 153 +++++++++++++++++ .../org/apache/dubbo/event/EchoEvent.java | 13 +- .../apache/dubbo/event/EchoEventListener.java | 32 ++++ .../dubbo/event/EchoEventListener2.java | 61 +++++++ .../dubbo/event/EventDispatcherTest.java | 45 +++++ .../apache/dubbo/event/EventListenerTest.java | 44 +++++ .../apache/dubbo/event/GenericEventTest.java | 41 +++++ .../event/ParallelEventDispatcherTest.java | 59 +++++++ .../org.apache.dubbo.event.EventListener | 1 + pom.xml | 1 + 22 files changed, 1131 insertions(+), 9 deletions(-) create mode 100644 dubbo-event/pom.xml create mode 100644 dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java create mode 100644 dubbo-event/src/main/java/org/apache/dubbo/event/DirectEventDispatcher.java create mode 100644 dubbo-event/src/main/java/org/apache/dubbo/event/Event.java create mode 100644 dubbo-event/src/main/java/org/apache/dubbo/event/EventDispatcher.java create mode 100644 dubbo-event/src/main/java/org/apache/dubbo/event/EventListener.java rename dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/Constants.java => dubbo-event/src/main/java/org/apache/dubbo/event/GenericEvent.java (71%) create mode 100644 dubbo-event/src/main/java/org/apache/dubbo/event/Listenable.java create mode 100644 dubbo-event/src/main/java/org/apache/dubbo/event/ParallelEventDispatcher.java create mode 100644 dubbo-event/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.event.EventDispatcher create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/AbstractEventListener.java create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/DirectEventDispatcherTest.java rename dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java => dubbo-event/src/test/java/org/apache/dubbo/event/EchoEvent.java (81%) create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener.java create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener2.java create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/EventDispatcherTest.java create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/EventListenerTest.java create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/GenericEventTest.java create mode 100644 dubbo-event/src/test/java/org/apache/dubbo/event/ParallelEventDispatcherTest.java create mode 100644 dubbo-event/src/test/resources/META-INF/services/org.apache.dubbo.event.EventListener diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index be5a2e3fd77..aec8b0f4847 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -500,6 +500,15 @@ true + + + org.apache.dubbo + dubbo-event + ${project.version} + compile + true + + org.springframework diff --git a/dubbo-event/pom.xml b/dubbo-event/pom.xml new file mode 100644 index 00000000000..6f0bb82fb81 --- /dev/null +++ b/dubbo-event/pom.xml @@ -0,0 +1,30 @@ + + + + org.apache.dubbo + dubbo-parent + ${revision} + ../pom.xml + + 4.0.0 + + dubbo-event + jar + + dubbo-event + The event module of Dubbo project + + + + org.apache.dubbo + dubbo-common + ${revision} + true + + + + + + \ No newline at end of file diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java b/dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java new file mode 100644 index 00000000000..8989f29a0aa --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/AbstractEventDispatcher.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +import static java.util.Collections.sort; +import static java.util.Collections.unmodifiableList; +import static java.util.ServiceLoader.load; +import static org.apache.dubbo.event.EventListener.findEventType; + +/** + * The abstract {@link EventDispatcher} providers the common implementation. + * + * @see EventDispatcher + * @see Listenable + * @see ServiceLoader + * @see EventListener + * @see Event + * @since 2.7.2 + */ +public abstract class AbstractEventDispatcher implements EventDispatcher { + + private final Object mutex = new Object(); + + private final ConcurrentMap, List> listenersCache = new ConcurrentHashMap<>(); + + private final Executor executor; + + /** + * Constructor with an instance of {@link Executor} + * + * @param executor {@link Executor} + * @throws NullPointerException executor is null + */ + protected AbstractEventDispatcher(Executor executor) { + if (executor == null) { + throw new NullPointerException("executor must not be null"); + } + this.executor = executor; + this.loadEventListenerInstances(); + } + + @Override + public void addEventListener(EventListener listener) throws NullPointerException, IllegalArgumentException { + Listenable.assertListener(listener); + doInListener(listener, listeners -> { + addIfAbsent(listeners, listener); + }); + } + + @Override + public void removeEventListener(EventListener listener) throws NullPointerException, IllegalArgumentException { + Listenable.assertListener(listener); + doInListener(listener, listeners -> listeners.remove(listener)); + } + + @Override + public List> getAllEventListeners() { + List> listeners = new LinkedList<>(); + + listenersCache + .entrySet() + .stream() + .map(Map.Entry::getValue) + .flatMap(Collection::stream) + .forEach(listener -> { + addIfAbsent(listeners, listener); + }); + + sort((List) listeners); + + return unmodifiableList(listeners); + } + + private void addIfAbsent(Collection collection, E element) { + if (!collection.contains(element)) { + collection.add(element); + } + } + + @Override + public void dispatch(Event event) { + + Executor executor = getExecutor(); + + // execute in sequential or parallel execution model + executor.execute(() -> { + listenersCache.entrySet() + .stream() + .filter(entry -> entry.getKey().isAssignableFrom(event.getClass())) + .map(Map.Entry::getValue) + .flatMap(Collection::stream) + .forEach(listener -> { + listener.onEvent(event); + }); + }); + } + + /** + * @return the non-null {@link Executor} + */ + @Override + public final Executor getExecutor() { + return executor; + } + + protected void doInListener(EventListener listener, Consumer> consumer) { + Class eventType = findEventType(listener); + if (eventType != null) { + synchronized (mutex) { + List listeners = listenersCache.computeIfAbsent(eventType, e -> new LinkedList<>()); + // consume + consumer.accept(listeners); + // sort + sort(listeners); + } + } + } + + /** + * Default, load the instances of {@link EventListener event listeners} by {@link ServiceLoader} + *

+ * It could be override by the sub-class + * + * @see EventListener + * @see ServiceLoader#load(Class) + */ + protected void loadEventListenerInstances() { + ServiceLoader serviceLoader = load(EventListener.class, getClass().getClassLoader()); + serviceLoader.forEach(this::addEventListener); + } +} diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/DirectEventDispatcher.java b/dubbo-event/src/main/java/org/apache/dubbo/event/DirectEventDispatcher.java new file mode 100644 index 00000000000..91282b34c6e --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/DirectEventDispatcher.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +/** + * Direct {@link EventDispatcher} implementation uses current thread execution model + * + * @see EventDispatcher + * @since 2.7.2 + */ +public final class DirectEventDispatcher extends AbstractEventDispatcher { + + public DirectEventDispatcher() { + super(DIRECT_EXECUTOR); + } +} diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/Event.java b/dubbo-event/src/main/java/org/apache/dubbo/event/Event.java new file mode 100644 index 00000000000..13f2aec16e1 --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/Event.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.util.EventObject; + +/** + * An event object of Dubbo is based on the Java standard {@link EventObject event} + * + * @since 2.7.2 + */ +public abstract class Event extends EventObject { + + private static final long serialVersionUID = -1704315605423947137L; + + /** + * The timestamp of event occurs + */ + private final long timestamp; + + /** + * Constructs a prototypical Event. + * + * @param source The object on which the Event initially occurred. + * @throws IllegalArgumentException if source is null. + */ + public Event(Object source) { + super(source); + this.timestamp = System.currentTimeMillis(); + } + + public long getTimestamp() { + return timestamp; + } +} diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/EventDispatcher.java b/dubbo-event/src/main/java/org/apache/dubbo/event/EventDispatcher.java new file mode 100644 index 00000000000..ab3a2e4998b --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/EventDispatcher.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.extension.SPI; + +import java.util.concurrent.Executor; + +/** + * {@link Event Dubbo Event} Dispatcher + * + * @see Event + * @see EventListener + * @see DirectEventDispatcher + * @since 2.7.2 + */ +@SPI("direct") +public interface EventDispatcher extends Listenable> { + + /** + * Direct {@link Executor} uses sequential execution model + */ + Executor DIRECT_EXECUTOR = Runnable::run; + + /** + * Dispatch a Dubbo event to the registered {@link EventListener Dubbo event listeners} + * + * @param event a {@link Event Dubbo event} + */ + void dispatch(Event event); + + /** + * The {@link Executor} to dispatch a {@link Event Dubbo event} + * + * @return default implementation directly invoke {@link Runnable#run()} method, rather than multiple-threaded + * {@link Executor}. If the return value is null, the behavior is same as default. + * @see #DIRECT_EXECUTOR + */ + default Executor getExecutor() { + return DIRECT_EXECUTOR; + } + + /** + * The default extension of {@link EventDispatcher} is loaded by {@link ExtensionLoader} + * + * @return the default extension of {@link EventDispatcher} + */ + static EventDispatcher getDefaultExtension() { + return ExtensionLoader.getExtensionLoader(EventDispatcher.class).getDefaultExtension(); + } +} diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/EventListener.java b/dubbo-event/src/main/java/org/apache/dubbo/event/EventListener.java new file mode 100644 index 00000000000..92c35b9c882 --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/EventListener.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Objects; + +import static java.lang.Integer.compare; +import static org.apache.dubbo.common.utils.ReflectUtils.findParameterizedTypes; + +/** + * The {@link Event Dubbo Event} Listener that is based on Java standard {@link java.util.EventListener} interface supports + * the generic {@link Event}. + *

+ * The {@link #onEvent(Event) handle method} will be notified when the matched-type {@link Event Dubbo Event} is + * published, whose priority could be changed by {@link #getPriority()} method. + * + * @param the concrete class of {@link Event Dubbo Event} + * @see Event + * @see java.util.EventListener + * @since 2.7.2 + */ +@FunctionalInterface +public interface EventListener extends java.util.EventListener, Comparable> { + + /** + * Handle a {@link Event Dubbo Event} when it's be published + * + * @param event a {@link Event Dubbo Event} + */ + void onEvent(E event); + + /** + * The priority of {@link EventListener current listener}. + * + * @return the value is more greater, the priority is more lower. + * {@link Integer#MIN_VALUE} indicates the highest priority. The default value is {@link Integer#MAX_VALUE}. + * The comparison rule , refer to {@link #compareTo(EventListener)}. + * @see #compareTo(EventListener) + * @see Integer#MAX_VALUE + * @see Integer#MIN_VALUE + */ + default int getPriority() { + return Integer.MAX_VALUE; + } + + @Override + default int compareTo(EventListener another) { + return compare(this.getPriority(), another.getPriority()); + } + + /** + * Find the {@link Class type} {@link Event Dubbo event} from the specified {@link EventListener Dubbo event listener} + * + * @param listener the {@link Class class} of {@link EventListener Dubbo event listener} + * @return null if not found + */ + static Class findEventType(EventListener listener) { + return findEventType(listener.getClass()); + } + + /** + * Find the {@link Class type} {@link Event Dubbo event} from the specified {@link EventListener Dubbo event listener} + * + * @param listenerClass the {@link Class class} of {@link EventListener Dubbo event listener} + * @return null if not found + */ + static Class findEventType(Class listenerClass) { + Class eventType = null; + + if (listenerClass != null && EventListener.class.isAssignableFrom(listenerClass)) { + eventType = findParameterizedTypes(listenerClass) + .stream() + .map(EventListener::findEventType) + .filter(Objects::nonNull) + .findAny() + .orElse((Class) findEventType(listenerClass.getSuperclass())); + } + + return eventType; + } + + /** + * Find the type {@link Event Dubbo event} from the specified {@link ParameterizedType} presents + * a class of {@link EventListener Dubbo event listener} + * + * @param parameterizedType the {@link ParameterizedType} presents a class of {@link EventListener Dubbo event listener} + * @return null if not found + */ + static Class findEventType(ParameterizedType parameterizedType) { + Class eventType = null; + + Type rawType = parameterizedType.getRawType(); + if ((rawType instanceof Class) && EventListener.class.isAssignableFrom((Class) rawType)) { + Type[] typeArguments = parameterizedType.getActualTypeArguments(); + for (Type typeArgument : typeArguments) { + if (typeArgument instanceof Class) { + Class argumentClass = (Class) typeArgument; + if (Event.class.isAssignableFrom(argumentClass)) { + eventType = argumentClass; + break; + } + } + } + } + + return eventType; + } +} \ No newline at end of file diff --git a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/Constants.java b/dubbo-event/src/main/java/org/apache/dubbo/event/GenericEvent.java similarity index 71% rename from dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/Constants.java rename to dubbo-event/src/main/java/org/apache/dubbo/event/GenericEvent.java index fcf1a51fc56..591bcebb086 100644 --- a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/Constants.java +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/GenericEvent.java @@ -14,11 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.configcenter; +package org.apache.dubbo.event; -public interface Constants { - String CONFIG_CLUSTER_KEY = "config.cluster"; - String CONFIG_NAMESPACE_KEY = "config.namespace"; - String CONFIG_GROUP_KEY = "config.group"; - String CONFIG_CHECK_KEY = "config.check"; +/** + * Generic {@link Event Dubbo event} + * + * @param the type of event source + * @since 2.7.2 + */ +public class GenericEvent extends Event { + + public GenericEvent(S source) { + super(source); + } + + public S getSource() { + return (S) super.getSource(); + } } diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/Listenable.java b/dubbo-event/src/main/java/org/apache/dubbo/event/Listenable.java new file mode 100644 index 00000000000..9b854ac2203 --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/Listenable.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static java.util.stream.StreamSupport.stream; + +/** + * Dubbo Event Listenable + * + * @see EventDispatcher + * @since 2.7.2 + */ +public interface Listenable> { + + /** + * Add a {@link EventListener Dubbo event listener} + * + * @param listener a {@link EventListener Dubbo event listener} + * If current {@link EventListener} is existed, return false + * @throws NullPointerException if listener argument is null + * @throws IllegalArgumentException if listener argument is not concrete instance + */ + void addEventListener(E listener) throws NullPointerException, IllegalArgumentException; + + /** + * Add one or more {@link EventListener Dubbo event listeners} + * + * @param listener a {@link EventListener Dubbo event listener} + * @param others an optional {@link EventListener Dubbo event listeners} + * @throws NullPointerException if one of arguments is null + * @throws IllegalArgumentException if one of arguments argument is not concrete instance + */ + default void addEventListeners(E listener, E... others) throws NullPointerException, + IllegalArgumentException { + List listeners = new ArrayList<>(1 + others.length); + listeners.add(listener); + listeners.addAll(Arrays.asList(others)); + addEventListeners(listeners); + } + + /** + * Add multiple {@link EventListener Dubbo event listeners} + * + * @param listeners the {@link EventListener Dubbo event listeners} + * @throws NullPointerException if listeners argument is null + * @throws IllegalArgumentException if any element of listeners is not concrete instance + */ + default void addEventListeners(Iterable listeners) throws NullPointerException, IllegalArgumentException { + stream(listeners.spliterator(), false).forEach(this::addEventListener); + } + + /** + * Remove a a {@link EventListener Dubbo event listener} + * + * @param listener a {@link EventListener Dubbo event listener} + * @return If remove successfully, return true. + * If current {@link EventListener} is existed, return false + * @throws NullPointerException if listener argument is null + */ + void removeEventListener(E listener) throws NullPointerException, IllegalArgumentException; + + /** + * Remove a {@link EventListener Dubbo event listener} + * + * @param listeners the {@link EventListener Dubbo event listeners} + * @return If remove successfully, return true. + * If current {@link EventListener} is existed, return false + * @throws NullPointerException if listener argument is null + * @throws IllegalArgumentException if any element of listeners is not concrete instance + */ + default void removeEventListeners(Iterable listeners) throws NullPointerException, IllegalArgumentException { + stream(listeners.spliterator(), false).forEach(this::removeEventListener); + } + + /** + * Remove all {@link EventListener Dubbo event listeners} + * + * @return a amount of removed listeners + */ + default void removeAllEventListeners() { + removeEventListeners(getAllEventListeners()); + } + + /** + * Get all registered {@link EventListener Dubbo event listeners} + * + * @return non-null read-only ordered {@link EventListener Dubbo event listeners} + * @see EventListener#getPriority() + */ + List getAllEventListeners(); + + + /** + * Assets the listener is valid or not + * + * @param listener the instance of {@link EventListener} + * @throws NullPointerException + */ + static void assertListener(EventListener listener) throws NullPointerException { + if (listener == null) { + throw new NullPointerException("The listener must not be null."); + } + + Class listenerClass = listener.getClass(); + + int modifiers = listenerClass.getModifiers(); + + if (Modifier.isAbstract(modifiers) || Modifier.isInterface(modifiers)) { + throw new IllegalArgumentException("The listener must be concrete class"); + } + } +} diff --git a/dubbo-event/src/main/java/org/apache/dubbo/event/ParallelEventDispatcher.java b/dubbo-event/src/main/java/org/apache/dubbo/event/ParallelEventDispatcher.java new file mode 100644 index 00000000000..f7b0f329763 --- /dev/null +++ b/dubbo-event/src/main/java/org/apache/dubbo/event/ParallelEventDispatcher.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.util.concurrent.ForkJoinPool; + +/** + * Parallel {@link EventDispatcher} implementation uses {@link ForkJoinPool#commonPool() JDK common thread pool} + * + * @see ForkJoinPool#commonPool() + * @since 2.7.2 + */ +public class ParallelEventDispatcher extends AbstractEventDispatcher { + + public ParallelEventDispatcher() { + super(ForkJoinPool.commonPool()); + } +} diff --git a/dubbo-event/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.event.EventDispatcher b/dubbo-event/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.event.EventDispatcher new file mode 100644 index 00000000000..ecd54edb280 --- /dev/null +++ b/dubbo-event/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.event.EventDispatcher @@ -0,0 +1,2 @@ +direct=org.apache.dubbo.event.DirectEventDispatcher +parallel=org.apache.dubbo.event.ParallelEventDispatcher diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/AbstractEventListener.java b/dubbo-event/src/test/java/org/apache/dubbo/event/AbstractEventListener.java new file mode 100644 index 00000000000..692b3b6d12b --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/AbstractEventListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class AbstractEventListener implements EventListener { + + private final AtomicInteger eventOccurs = new AtomicInteger(0); + + @Override + public final void onEvent(E event) { + eventOccurs.getAndIncrement(); + handleEvent(event); + } + + protected abstract void handleEvent(E event); + + public int getEventOccurs() { + return eventOccurs.get(); + } + + protected void println(String message) { + System.out.printf("[%s] %s\n", Thread.currentThread().getName(), message); + } +} \ No newline at end of file diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/DirectEventDispatcherTest.java b/dubbo-event/src/test/java/org/apache/dubbo/event/DirectEventDispatcherTest.java new file mode 100644 index 00000000000..c3cc7323df2 --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/DirectEventDispatcherTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * {@link DirectEventDispatcher} Test + * + * @since 2.7.2 + */ +public class DirectEventDispatcherTest { + + private DirectEventDispatcher dispatcher; + + private EchoEventListener echoEventListener; + + private EchoEventListener2 echoEventListener2; + + @BeforeEach + public void init() { + dispatcher = new DirectEventDispatcher(); + echoEventListener = new EchoEventListener(); + echoEventListener2 = new EchoEventListener2(); + } + + @AfterEach + public void destroy() { + dispatcher.removeAllEventListeners(); + } + + @Test + public void testGetExecutor() { + assertNotNull(dispatcher.getExecutor()); + } + + @Test + public void testGetAllListeners() { + assertTrue(dispatcher.getAllEventListeners().isEmpty()); + } + + @Test + public void testSingleListener() { + // add two listeners + dispatcher.addEventListener(echoEventListener); + dispatcher.addEventListener(echoEventListener2); + assertEquals(asList(echoEventListener2, echoEventListener), dispatcher.getAllEventListeners()); + + // add a duplicated listener + dispatcher.addEventListener(echoEventListener); + assertEquals(asList(echoEventListener2, echoEventListener), dispatcher.getAllEventListeners()); + + // remove + dispatcher.removeEventListener(echoEventListener); + assertEquals(asList(echoEventListener2), dispatcher.getAllEventListeners()); + + dispatcher.removeEventListener(echoEventListener2); + assertEquals(emptyList(), dispatcher.getAllEventListeners()); + } + + @Test + public void testMultipleListeners() { + + // add two listeners + dispatcher.addEventListeners(echoEventListener, echoEventListener2); + assertEquals(asList(echoEventListener2, echoEventListener), dispatcher.getAllEventListeners()); + + // remove all listeners + dispatcher.removeAllEventListeners(); + assertEquals(emptyList(), dispatcher.getAllEventListeners()); + + // add the duplicated listeners + dispatcher.addEventListeners(echoEventListener, echoEventListener, echoEventListener2); + assertEquals(asList(echoEventListener2, echoEventListener), dispatcher.getAllEventListeners()); + + // remove all listeners + dispatcher.removeAllEventListeners(); + assertEquals(emptyList(), dispatcher.getAllEventListeners()); + + dispatcher.addEventListeners(asList(echoEventListener, echoEventListener, echoEventListener2)); + assertEquals(asList(echoEventListener2, echoEventListener), dispatcher.getAllEventListeners()); + + dispatcher.removeEventListeners(asList(echoEventListener, echoEventListener, echoEventListener2)); + assertEquals(emptyList(), dispatcher.getAllEventListeners()); + } + + @Test + public void testDispatchEvent() { + + dispatcher.addEventListener(echoEventListener); + + // dispatch a Event + dispatcher.dispatch(new Event("Test") { + }); + + // no-op occurs + assertEquals(0, echoEventListener.getEventOccurs()); + + // dispatch a EchoEvent + dispatcher.dispatch(new EchoEvent("Hello,World")); + + // event has been handled + assertEquals(1, echoEventListener.getEventOccurs()); + + dispatcher.addEventListener(echoEventListener2); + + // reset the listeners + init(); + dispatcher.addEventListeners(echoEventListener, echoEventListener2); + + // dispatch a Event + dispatcher.dispatch(new Event("Test") { + }); + + // echoEventListener will be not triggered + 0 + // echoEventListener2 will be triggered + 1 + assertEquals(0, echoEventListener.getEventOccurs()); + assertEquals(1, echoEventListener2.getEventOccurs()); + + // dispatch a EchoEvent + // echoEventListener and echoEventListener2 are triggered both (+1) + dispatcher.dispatch(new EchoEvent("Hello,World")); + assertEquals(1, echoEventListener.getEventOccurs()); + assertEquals(2, echoEventListener2.getEventOccurs()); + + // both +1 + dispatcher.dispatch(new EchoEvent("2019")); + assertEquals(2, echoEventListener.getEventOccurs()); + assertEquals(3, echoEventListener2.getEventOccurs()); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java b/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEvent.java similarity index 81% rename from dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java rename to dubbo-event/src/test/java/org/apache/dubbo/event/EchoEvent.java index c1bfb3fe43d..785d411f256 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEvent.java @@ -14,9 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.dubbo.event; -package org.apache.dubbo.registry; +/** + * Echo {@link Event} + * + * @since 2.7.2 + */ +class EchoEvent extends Event { -public interface Constants { - String REGISTER_IP_KEY = "register.ip"; + public EchoEvent(Object source) { + super(source); + } } diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener.java b/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener.java new file mode 100644 index 00000000000..f88b9df3318 --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.io.Serializable; + +/** + * {@link EchoEvent} {@link EventListener} + * + * @since 2.7.2 + */ +public class EchoEventListener extends AbstractEventListener implements Serializable { + + @Override + public void handleEvent(EchoEvent event) { + println("EchoEventListener : " + event); + } +} \ No newline at end of file diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener2.java b/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener2.java new file mode 100644 index 00000000000..f30327c4852 --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/EchoEventListener2.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import java.io.Serializable; +import java.util.Objects; +import java.util.Vector; + +/** + * {@link EchoEvent} {@link EventListener} 2 + * + * @since 2.7.2 + */ +public class EchoEventListener2 extends Vector> implements Serializable, Comparable>, + EventListener { + + private AbstractEventListener delegate = new AbstractEventListener() { + @Override + protected void handleEvent(Event event) { + println("EchoEventListener2 : " + event); + } + }; + + @Override + public void onEvent(Event event) { + delegate.onEvent(event); + } + + @Override + public int getPriority() { + return 0; + } + + public int getEventOccurs() { + return delegate.getEventOccurs(); + } + + @Override + public boolean equals(Object o) { + return this.getClass().equals(o.getClass()); + } + + @Override + public int hashCode() { + return Objects.hash(this.getClass()); + } +} diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/EventDispatcherTest.java b/dubbo-event/src/test/java/org/apache/dubbo/event/EventDispatcherTest.java new file mode 100644 index 00000000000..b8d57f93992 --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/EventDispatcherTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import org.junit.jupiter.api.Test; + +import static org.apache.dubbo.event.EventDispatcher.DIRECT_EXECUTOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * {@link EventDispatcher} Test + * + * @see DirectEventDispatcher + * @since 2.7.2 + */ +public class EventDispatcherTest { + + private EventDispatcher defaultInstance = EventDispatcher.getDefaultExtension(); + + @Test + public void testDefaultInstance() { + assertEquals(DirectEventDispatcher.class, defaultInstance.getClass()); + } + + @Test + public void testDefaultMethods() { + assertEquals(DIRECT_EXECUTOR, defaultInstance.getExecutor()); + assertTrue(defaultInstance.getAllEventListeners().isEmpty()); + } +} diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/EventListenerTest.java b/dubbo-event/src/test/java/org/apache/dubbo/event/EventListenerTest.java new file mode 100644 index 00000000000..eb3d27b02ea --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/EventListenerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import org.junit.jupiter.api.Test; + +import static org.apache.dubbo.event.EventListener.findEventType; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * {@link EventListener} Test + * + * @since 2.7.2 + */ +public class EventListenerTest { + + @Test + public void testFindEventHierarchicalTypes() { + assertEquals(EchoEvent.class, findEventType(new EchoEventListener())); + assertEquals(Event.class, findEventType(new EchoEventListener2())); + + assertEquals(EchoEvent.class, findEventType(EchoEventListener.class)); + assertEquals(Event.class, findEventType(EchoEventListener2.class)); + } + + @Test + public void testOnEvent() { + } + +} \ No newline at end of file diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/GenericEventTest.java b/dubbo-event/src/test/java/org/apache/dubbo/event/GenericEventTest.java new file mode 100644 index 00000000000..92970343f1c --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/GenericEventTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * {@link GenericEvent} Test + * + * @since 2.7.2 + */ +public class GenericEventTest { + + @Test + public void test() { + + long timestamp = System.currentTimeMillis(); + GenericEvent event = new GenericEvent("Hello,World"); + + assertEquals("Hello,World", event.getSource()); + assertTrue(event.getTimestamp() >= timestamp); + } + +} diff --git a/dubbo-event/src/test/java/org/apache/dubbo/event/ParallelEventDispatcherTest.java b/dubbo-event/src/test/java/org/apache/dubbo/event/ParallelEventDispatcherTest.java new file mode 100644 index 00000000000..755d482d2a8 --- /dev/null +++ b/dubbo-event/src/test/java/org/apache/dubbo/event/ParallelEventDispatcherTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.event; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * {@link ParallelEventDispatcher} Test + * + * @since 2.7.2 + */ +public class ParallelEventDispatcherTest { + + private EventDispatcher eventDispatcher; + + private AbstractEventListener listener; + + @BeforeEach + public void init() { + eventDispatcher = new ParallelEventDispatcher(); + listener = new EchoEventListener(); + eventDispatcher.addEventListener(listener); + } + + @Test + public void testDispatchEvent() throws InterruptedException { + eventDispatcher.dispatch(new EchoEvent("Hello,World")); + ForkJoinPool.commonPool().awaitTermination(1, TimeUnit.SECONDS); + // event has been handled + assertEquals(1, listener.getEventOccurs()); + } + + @AfterAll + public static void destroy() { + ForkJoinPool.commonPool().shutdown(); + } + +} diff --git a/dubbo-event/src/test/resources/META-INF/services/org.apache.dubbo.event.EventListener b/dubbo-event/src/test/resources/META-INF/services/org.apache.dubbo.event.EventListener new file mode 100644 index 00000000000..b226c932d13 --- /dev/null +++ b/dubbo-event/src/test/resources/META-INF/services/org.apache.dubbo.event.EventListener @@ -0,0 +1 @@ +# The Java Standard SPI(ServiceLoader) \ No newline at end of file diff --git a/pom.xml b/pom.xml index a1f7b56820d..434ff0e1d3e 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,7 @@ dubbo-metadata-report dubbo-configcenter dubbo-dependencies + dubbo-event