Skip to content

Commit

Permalink
Polish apache/incubator-dubbo/#4096 : To add new module for Dubbo Event
Browse files Browse the repository at this point in the history
  • Loading branch information
taogu.mxx committed May 20, 2019
1 parent d0d1a65 commit 619e414
Show file tree
Hide file tree
Showing 22 changed files with 1,226 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,28 @@
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URL;
import java.security.CodeSource;
import java.security.ProtectionDomain;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;

/**
* ReflectUtils
Expand Down Expand Up @@ -1096,4 +1105,62 @@ public static Map<String, Method> getBeanPropertyReadMethods(Class cl) {

return properties;
}


/**
* Find the {@link Set} of {@link ParameterizedType}
*
* @param sourceClass the source {@link Class class}
* @return non-null read-only {@link Set}
* @since 2.7.2
*/
public static Set<ParameterizedType> findParameterizedTypes(Class<?> sourceClass) {
// Add Generic Interfaces
List<Type> genericTypes = new LinkedList<>(asList(sourceClass.getGenericInterfaces()));
// Add Generic Super Class
genericTypes.add(sourceClass.getGenericSuperclass());

Set<ParameterizedType> parameterizedTypes = genericTypes.stream()
.filter(type -> type instanceof ParameterizedType)// filter ParameterizedType
.map(type -> ParameterizedType.class.cast(type)) // cast to ParameterizedType
.collect(Collectors.toSet());

if (parameterizedTypes.isEmpty()) { // If not found, try to search super types recursively
genericTypes.stream()
.filter(type -> type instanceof Class)
.map(type -> Class.class.cast(type))
.forEach(superClass -> {
parameterizedTypes.addAll(findParameterizedTypes(superClass));
});
}

return unmodifiableSet(parameterizedTypes); // build as a Set

}

/**
* Find the hierarchical types form the source {@link Class class} by specified {@link Class type}.
*
* @param sourceClass the source {@link Class class}
* @param matchType the type to match
* @param <T> the type to match
* @return non-null read-only {@link Set}
* @since 2.7.2
*/
public static <T> Set<Class<T>> findHierarchicalTypes(Class<?> sourceClass, Class<T> matchType) {
if (sourceClass == null) {
return Collections.emptySet();
}

Set<Class<T>> hierarchicalTypes = new LinkedHashSet<>();

if (matchType.isAssignableFrom(sourceClass)) {
hierarchicalTypes.add((Class<T>) sourceClass);
}

// Find all super classes
hierarchicalTypes.addAll(findHierarchicalTypes(sourceClass.getSuperclass(), matchType));

return unmodifiableSet(hierarchicalTypes);
}
}
30 changes: 30 additions & 0 deletions dubbo-event/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dubbo-event</artifactId>
<packaging>jar</packaging>

<name>dubbo-event</name>
<description>The event module of Dubbo project</description>

<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-common</artifactId>
<version>${revision}</version>
<optional>true</optional>
</dependency>

</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.event;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import static java.util.Collections.sort;
import static java.util.Collections.unmodifiableList;
import static java.util.ServiceLoader.load;
import static org.apache.dubbo.event.EventListener.findEventType;

/**
* The abstract {@link EventDispatcher} providers the common implementation.
*
* @see EventDispatcher
* @see Listenable
* @see ServiceLoader
* @see EventListener
* @see Event
* @since 2.7.2
*/
public abstract class AbstractEventDispatcher implements EventDispatcher {

private final Object mutex = new Object();

private final ConcurrentMap<Class<? extends Event>, List<EventListener>> listenersCache = new ConcurrentHashMap<>();

private final Executor executor;

/**
* Constructor with an instance of {@link Executor}
*
* @param executor {@link Executor}
* @throws NullPointerException <code>executor</code> is <code>null</code>
*/
protected AbstractEventDispatcher(Executor executor) {
if (executor == null) {
throw new NullPointerException("executor must not be null");
}
this.executor = executor;
this.loadEventListenerInstances();
}

@Override
public void addEventListener(EventListener<?> listener) throws NullPointerException, IllegalArgumentException {
Listenable.assertListener(listener);
doInListener(listener, listeners -> {
addIfAbsent(listeners, listener);
});
}

@Override
public void removeEventListener(EventListener<?> listener) throws NullPointerException, IllegalArgumentException {
Listenable.assertListener(listener);
doInListener(listener, listeners -> listeners.remove(listener));
}

@Override
public List<EventListener<?>> getAllEventListeners() {
List<EventListener<?>> listeners = new LinkedList<>();

listenersCache
.entrySet()
.stream()
.map(Map.Entry::getValue)
.flatMap(Collection::stream)
.forEach(listener -> {
addIfAbsent(listeners, listener);
});

sort((List) listeners);

return unmodifiableList(listeners);
}

private <E> void addIfAbsent(Collection<E> collection, E element) {
if (!collection.contains(element)) {
collection.add(element);
}
}

@Override
public void dispatch(Event event) {

Executor executor = getExecutor();

// execute in sequential or parallel execution model
executor.execute(() -> {
listenersCache.entrySet()
.stream()
.filter(entry -> entry.getKey().isAssignableFrom(event.getClass()))
.map(Map.Entry::getValue)
.flatMap(Collection::stream)
.forEach(listener -> {
listener.onEvent(event);
});
});
}

/**
* @return the non-null {@link Executor}
*/
@Override
public final Executor getExecutor() {
return executor;
}

protected void doInListener(EventListener<?> listener, Consumer<Collection<EventListener>> consumer) {
Class<? extends Event> eventType = findEventType(listener);
if (eventType != null) {
synchronized (mutex) {
List<EventListener> listeners = listenersCache.computeIfAbsent(eventType, e -> new LinkedList<>());
// consume
consumer.accept(listeners);
// sort
sort(listeners);
}
}
}

/**
* Default, load the instances of {@link EventListener event listeners} by {@link ServiceLoader}
* <p>
* It could be override by the sub-class
*
* @see EventListener
* @see ServiceLoader#load(Class)
*/
protected void loadEventListenerInstances() {
ServiceLoader<EventListener> serviceLoader = load(EventListener.class, getClass().getClassLoader());
serviceLoader.forEach(this::addEventListener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.event;

/**
* Direct {@link EventDispatcher} implementation uses current thread execution model
*
* @see EventDispatcher
* @since 2.7.2
*/
public final class DirectEventDispatcher extends AbstractEventDispatcher {

public DirectEventDispatcher() {
super(DIRECT_EXECUTOR);
}
}
49 changes: 49 additions & 0 deletions dubbo-event/src/main/java/org/apache/dubbo/event/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.event;

import java.util.EventObject;

/**
* An event object of Dubbo is based on the Java standard {@link EventObject event}
*
* @since 2.7.2
*/
public abstract class Event extends EventObject {

private static final long serialVersionUID = -1704315605423947137L;

/**
* The timestamp of event occurs
*/
private final long timestamp;

/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public Event(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}

public long getTimestamp() {
return timestamp;
}
}
Loading

0 comments on commit 619e414

Please sign in to comment.