Skip to content

Commit

Permalink
Polish apache/incubator-dubbo/apache#4096 : To add new module for Dub…
Browse files Browse the repository at this point in the history
…bo Event
  • Loading branch information
mercyblitz committed May 20, 2019
1 parent d0d1a65 commit 2f20ddd
Show file tree
Hide file tree
Showing 22 changed files with 1,131 additions and 9 deletions.
9 changes: 9 additions & 0 deletions dubbo-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,15 @@
<optional>true</optional>
</dependency>

<!-- 2.7.2 new modules -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-event</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>

<!-- Transitive dependencies -->
<dependency>
<groupId>org.springframework</groupId>
Expand Down
30 changes: 30 additions & 0 deletions dubbo-event/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dubbo-event</artifactId>
<packaging>jar</packaging>

<name>dubbo-event</name>
<description>The event module of Dubbo project</description>

<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-common</artifactId>
<version>${revision}</version>
<optional>true</optional>
</dependency>

</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.event;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import static java.util.Collections.sort;
import static java.util.Collections.unmodifiableList;
import static java.util.ServiceLoader.load;
import static org.apache.dubbo.event.EventListener.findEventType;

/**
* The abstract {@link EventDispatcher} providers the common implementation.
*
* @see EventDispatcher
* @see Listenable
* @see ServiceLoader
* @see EventListener
* @see Event
* @since 2.7.2
*/
public abstract class AbstractEventDispatcher implements EventDispatcher {

private final Object mutex = new Object();

private final ConcurrentMap<Class<? extends Event>, List<EventListener>> listenersCache = new ConcurrentHashMap<>();

private final Executor executor;

/**
* Constructor with an instance of {@link Executor}
*
* @param executor {@link Executor}
* @throws NullPointerException <code>executor</code> is <code>null</code>
*/
protected AbstractEventDispatcher(Executor executor) {
if (executor == null) {
throw new NullPointerException("executor must not be null");
}
this.executor = executor;
this.loadEventListenerInstances();
}

@Override
public void addEventListener(EventListener<?> listener) throws NullPointerException, IllegalArgumentException {
Listenable.assertListener(listener);
doInListener(listener, listeners -> {
addIfAbsent(listeners, listener);
});
}

@Override
public void removeEventListener(EventListener<?> listener) throws NullPointerException, IllegalArgumentException {
Listenable.assertListener(listener);
doInListener(listener, listeners -> listeners.remove(listener));
}

@Override
public List<EventListener<?>> getAllEventListeners() {
List<EventListener<?>> listeners = new LinkedList<>();

listenersCache
.entrySet()
.stream()
.map(Map.Entry::getValue)
.flatMap(Collection::stream)
.forEach(listener -> {
addIfAbsent(listeners, listener);
});

sort((List) listeners);

return unmodifiableList(listeners);
}

private <E> void addIfAbsent(Collection<E> collection, E element) {
if (!collection.contains(element)) {
collection.add(element);
}
}

@Override
public void dispatch(Event event) {

Executor executor = getExecutor();

// execute in sequential or parallel execution model
executor.execute(() -> {
listenersCache.entrySet()
.stream()
.filter(entry -> entry.getKey().isAssignableFrom(event.getClass()))
.map(Map.Entry::getValue)
.flatMap(Collection::stream)
.forEach(listener -> {
listener.onEvent(event);
});
});
}

/**
* @return the non-null {@link Executor}
*/
@Override
public final Executor getExecutor() {
return executor;
}

protected void doInListener(EventListener<?> listener, Consumer<Collection<EventListener>> consumer) {
Class<? extends Event> eventType = findEventType(listener);
if (eventType != null) {
synchronized (mutex) {
List<EventListener> listeners = listenersCache.computeIfAbsent(eventType, e -> new LinkedList<>());
// consume
consumer.accept(listeners);
// sort
sort(listeners);
}
}
}

/**
* Default, load the instances of {@link EventListener event listeners} by {@link ServiceLoader}
* <p>
* It could be override by the sub-class
*
* @see EventListener
* @see ServiceLoader#load(Class)
*/
protected void loadEventListenerInstances() {
ServiceLoader<EventListener> serviceLoader = load(EventListener.class, getClass().getClassLoader());
serviceLoader.forEach(this::addEventListener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.event;

/**
* Direct {@link EventDispatcher} implementation uses current thread execution model
*
* @see EventDispatcher
* @since 2.7.2
*/
public final class DirectEventDispatcher extends AbstractEventDispatcher {

public DirectEventDispatcher() {
super(DIRECT_EXECUTOR);
}
}
49 changes: 49 additions & 0 deletions dubbo-event/src/main/java/org/apache/dubbo/event/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.event;

import java.util.EventObject;

/**
* An event object of Dubbo is based on the Java standard {@link EventObject event}
*
* @since 2.7.2
*/
public abstract class Event extends EventObject {

private static final long serialVersionUID = -1704315605423947137L;

/**
* The timestamp of event occurs
*/
private final long timestamp;

/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public Event(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}

public long getTimestamp() {
return timestamp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.event;

import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.SPI;

import java.util.concurrent.Executor;

/**
* {@link Event Dubbo Event} Dispatcher
*
* @see Event
* @see EventListener
* @see DirectEventDispatcher
* @since 2.7.2
*/
@SPI("direct")
public interface EventDispatcher extends Listenable<EventListener<?>> {

/**
* Direct {@link Executor} uses sequential execution model
*/
Executor DIRECT_EXECUTOR = Runnable::run;

/**
* Dispatch a Dubbo event to the registered {@link EventListener Dubbo event listeners}
*
* @param event a {@link Event Dubbo event}
*/
void dispatch(Event event);

/**
* The {@link Executor} to dispatch a {@link Event Dubbo event}
*
* @return default implementation directly invoke {@link Runnable#run()} method, rather than multiple-threaded
* {@link Executor}. If the return value is <code>null</code>, the behavior is same as default.
* @see #DIRECT_EXECUTOR
*/
default Executor getExecutor() {
return DIRECT_EXECUTOR;
}

/**
* The default extension of {@link EventDispatcher} is loaded by {@link ExtensionLoader}
*
* @return the default extension of {@link EventDispatcher}
*/
static EventDispatcher getDefaultExtension() {
return ExtensionLoader.getExtensionLoader(EventDispatcher.class).getDefaultExtension();
}
}
Loading

0 comments on commit 2f20ddd

Please sign in to comment.