Skip to content

Powerful event-bus optimized for high throughput in multi-threaded applications. Features: Sync and Async event publication, weak/strong references, event filtering, annotation driven

License

Notifications You must be signed in to change notification settings

bcaglayan/mbassador

 
 

Repository files navigation

build status maven central javadoc wiki

MBassador

MBassador is a light-weight, high-performance event bus implementing the publish subscribe pattern. It is designed for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance.

The core of MBassador is built around a custom data structure that provides non-blocking reads and minimized lock contention for writes such that performance degradation of concurrent read/write access is minimal. Benchmarks that illustrate the advantages of this design are available in this github repository.

wiki

The code is production ready: 86% instruction coverage, 82% branch coverage with randomized and concurrently run test sets, no major bug has been reported in the last 18 month. No modifications to the core will be made without thoroughly testing the code.

Usage | Features | Installation | Wiki | Release Notes | Integrations | Credits | Contribute | License

Usage

Using MBassador in your project is very easy. Create as many instances of MBassador as you like (usually a singleton will do) bus = new MBassador(), mark and configure your message handlers with @Handler annotations and finally register the listeners at any MBassador instance bus.subscribe(aListener). Start sending messages to your listeners using one of MBassador's publication methods bus.post(message).now() or bus.post(message).asynchronously().

As a first reference, consider this illustrative example. You might want to have a look at the collection of examples to see its features on more detail.

      
// Define your handlers

@Listener(references = References.Strong)
class SimpleFileListener{

    @Handler
    public void handle(File file){
      // do something with the file
    }
    
    @Handler(delivery = Invoke.Asynchronously)
    public void expensiveOperation(File file){
      // do something with the file
    }
    
    @Handler(condition = "msg.size >= 10000")
    @Enveloped(messages = {HashMap.class, LinkedList.class})
    public void handleLarge(MessageEnvelope envelope) {
       // handle objects without common super type
    }

}

// somewhere else in your code

MBassador bus = new MBassador();
bus.subscribe (new SimpleFileListener());
bus.post(new File("/tmp/smallfile.csv")).now();
bus.post(new File("/tmp/bigfile.csv")).asynchronously();

Features

Annotation driven

Annotation Function
@Handler Mark a method as message handler
@Listener Can be used to customize listener wide configuration like the used reference type
@Enveloped A message envelope can be used to pass messages of different types into a single handler
@Filter Add filtering to prevent certain messages from being published

Delivers everything, respects type hierarchy

Messages do not need to implement any interface and can be of any type. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for - e.g. a handler of Object.class receives everything. Messages that do not match any handler result in the publication of a DeadMessage object which wraps the original message. DeadMessage events can be handled by registering listeners that handle DeadMessage.

Synchronous and asynchronous message delivery

There are two types of (a-)synchronicity when using MBassador: message dispatch and handler invocation. Message dispatch

Synchronous dispatch means that the publish method blocks until all handlers have been processed. Note: This does not necessarily imply that each handler has been invoked and received the message - due to the possibility to combine synchronous dispatch with asynchronous handlers. This is the semantics of publish(Object obj) and post(Objec obj).now()

Asynchronous dispatch means that the publish method returns immediately and the message will be dispatched in another thread (fire and forget). This is the semantics of publishAsync(Object obj) and post(Objec obj).asynchronously()

Handler invocation

Synchronous handlers are invoked sequentially and from the same thread within a running publication. Asynchronous handlers means that the actual handler invocation is pushed to a queue that is processed by a pool of worker threads.

Configurable reference types

By default, MBassador uses weak references for listeners to relieve the programmer of the need to explicitly unsubscribe listeners that are not used anymore and avoid memory-leaks. This is very comfortable in container managed environments where listeners are created and destroyed by frameworks, i.e. Spring, Guice etc. Just add everything to the bus, it will ignore objects without handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.

Instead of using weak references, a listener can be configured to be referenced using strong references using @Listener(references=References.Strong). Strongly referenced listeners will stick around until explicitly unsubscribed.

Message filtering

MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to a single message handler. Since version 1.2.0 Java EL expressions in @Handler are another way to define conditional message dispatch. Messages that have matching handlers but do not pass the configured filters result in the publication of a FilteredMessage object which wraps the original message. FilteredMessage events can be handled by registering listeners that handle FilteredMessage.

Note: Since version 1.3.1 it is possible to wrap a filter in a custom annotation for reuse

    public static final class RejectAllFilter implements IMessageFilter {

        @Override
        public boolean accepts(Object event,  SubscriptionContext context) {
            return false;
        }
    }

    @IncludeFilters({@Filter(RejectAllFilter.class)})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RejectAll {}
    
    public static class FilteredMessageListener{
    
        // will cause republication of a FilteredEvent
        @Handler
        @RejectAll
        public void handleNone(Object any){
            FilteredEventCounter.incrementAndGet();
        }

        
    }

Enveloped messages

Message handlers can declare to receive an enveloped message using Enveloped. The envelope can wrap different types of messages to allow a single handler to handle multiple, unrelated message types.

Handler priorities

A handler can be associated with a priority to influence the order in which messages are delivered when multiple matching handlers exist

Custom error handling

Errors during message delivery are sent to all registered error handlers which can be added to the bus as necessary.

Extensibility

MBassador is designed to be extensible with custom implementations of various components like message dispatchers and handler invocations (using the decorator pattern), metadata reader (you can add your own annotations) and factories for different kinds of objects. A configuration object is used to customize the different configurable parts, see Features

Installation

MBassador is available from the Maven Central Repository using the following coordinates:

<dependency>
    <groupId>net.engio</groupId>
    <artifactId>mbassador</artifactId>
    <version>{see.git.tags.for.latest.version}</version>
</dependency>

You can also download binary release and javadoc from the maven central repository. Of course you can always clone the repository and build from source.

Documentation

There is ongoing effort to extend documentation and provide code samples and detailed explanations of how the message bus works. Code samples can also be found in the various test cases. Please read about the terminology used in this project to avoid confusion and misunderstanding.

Integrations

There is a spring-extension available to support CDI-like transactional message sending in a Spring environment. This is a good example of integration with other frameworks. Another example is the Guice integration.

Credits

The initial inspiration for creating this component comes from Google Guava's event bus implementation. I liked the simplicity of its design and I trust in the code quality of google libraries. Unfortunately it uses strong references only.

Thanks to all contributors, especially

Many thanks also to ej-technologies for providing an open source license of JProfiler and Jetbrains for a license of IntelliJ IDEA

OSS used by MBassador: jUnit | maven | mockito | slf4j | Odysseus JUEL

Contribute

Pick an issue from the list of open issues and start implementing. Make your PRs small and provide test code! Take a look at this issue for a good example.

Note: Due to the complexity of the data structure and synchronization code it took quite a while to get a stable core. New features will only be implemented if they do not require significant modification to the core. The primary focus of MBassador is to provide high-performance extended pub/sub.

Sample code and documentation are both very appreciated contributions. Especially integration with different frameworks is of great value. Feel free and welcome to create Wiki pages to share your code and ideas. Example: Guice integration

License

This project is distributed under the terms of the MIT License. See file "LICENSE" for further reference.

About

Powerful event-bus optimized for high throughput in multi-threaded applications. Features: Sync and Async event publication, weak/strong references, event filtering, annotation driven

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 99.9%
  • Shell 0.1%