Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message producer plugin #74

Merged
merged 2 commits into from
Oct 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions docs/async_message_producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Async Message Producer

[Back to documentation](../README.md#documentation)

# Usage
# Async Messaging

Messaging becomes really interesting when you process your messages asynchronous. For example push your messages on a database queue,
set up a cron job to periodically check the queue for new messages and process them. The bus implementations of PSB can
Expand All @@ -17,10 +17,22 @@ async processing on a per message basis by routing the appropriate messages to a
drivers like Doctrine DBAL and Predis (see http://bernardphp.com for a complete list of drivers)
- [GuzzleHttpProducer](https://github.com/prooph/psb-http-producer): Send messages to a remote system using
HTTP
- [ZeromqProducer](https://github.com/prooph/psb-zeromq-producer): Async message handling using super fast and simple to
set up ZeroMQ

# Usage

If you want to set up a bus that handles all messages async you can do so by attaching a [MessageProducerPlugin](plugins.md#messageproducerplugin)
initialized with your message producer of choice to a message bus.

If you want to decide on a per message basis if the message should be handled async you can use a normal [message router](plugins.md#routers)
and configure your message producer of choice as message handler for the appropriate messages.

*Note: The [RegexRouter](plugins.md#proophservicebuspluginrouterregexrouter) is a good choice if you want to handle all messages of a specific namespace async.*

# QueryBus

A async message producer for the QueryBus needs to provide a response by resolving the handed over deferred.
In a messaging system based on RabbitMQ for example you can make use of a callback queue feature.
In a messaging system based on ZeroMQ for example you can make use of request/response mode.
HTTP APIs provide responses naturally.
So these are both good candidates to use for remote querying.
23 changes: 23 additions & 0 deletions docs/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,26 @@ $commandBus->utilize($router);

With this technique you can configure the routing for all your messages without the need to create all message handlers
on every request. Only the responsible message handlers are lazy loaded by the service locator plugin.

# MessageProducerPlugin

If you want to route all messages to an [async message producer](async_message_producer.md) you can attach
this plugin to a message bus. If it is attached to a command or query bus all messages will only be routed to
the message producer. If it is attached to an event bus the message producer
will be added to the list of event listeners.

```php
//Let's say the zeromq message producer is available as a service in a container
/** @var \Prooph\ServiceBus\Async\MessageProducer $zeromqProducer */
$zeromqProducer = $container->get('async_event_producer');

//We now only need to set up a message producer plugin and let the message bus use it.
$messageProducerPlugin = new \Prooph\ServiceBus\Plugin\MessageProducerPlugin($zeromqProducer);

$eventBus = new \Prooph\ServiceBus\EvenBus();

$eventBus->utilize($messageProducerPlugin);

//Each event will now be routed to the async message producer
$eventBus->dispatch($domainEvent);
```
69 changes: 69 additions & 0 deletions src/Plugin/MessageProducerPlugin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php
/*
* This file is part of the prooph/service-bus.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 10/3/15 - 9:04 PM
*/
namespace Prooph\ServiceBus\Plugin;

use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\ActionEventListenerAggregate;
use Prooph\Common\Event\DetachAggregateHandlers;
use Prooph\ServiceBus\Async\MessageProducer;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\MessageBus;

/**
* Class MessageProducerPlugin
*
* If the MessageProducerPlugin is attached to a message bus it routes all messages
* to the Prooph\ServiceBus\Async\MessageProducer it is initialized with.
*
* @package Prooph\ServiceBus\Plugin
*/
final class MessageProducerPlugin implements ActionEventListenerAggregate
{
use DetachAggregateHandlers;

/**
* @var MessageProducer
*/
private $messageProducer;

/**
* @param MessageProducer $messageProducer
*/
public function __construct(MessageProducer $messageProducer)
{
$this->messageProducer = $messageProducer;
}

/**
* @param ActionEventEmitter $emitter
*/
public function attach(ActionEventEmitter $emitter)
{
$this->trackHandler($emitter->attachListener(MessageBus::EVENT_INITIALIZE, [$this, 'onDispatchInitialize']));
}

/**
* @param ActionEvent $event
*/
public function onDispatchInitialize(ActionEvent $event)
{
$bus = $event->getTarget();

if ($bus instanceof EventBus) {
$listeners = $event->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []);
$listeners[] = $this->messageProducer;
$event->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, $listeners);
} else {
$event->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, $this->messageProducer);
}
}
}
90 changes: 90 additions & 0 deletions tests/Plugin/MessageProducerPluginTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php
/*
* This file is part of the prooph/service-bus.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 10/3/15 - 9:17 PM
*/
namespace Prooph\ServiceBusTest\Plugin;

use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\ListenerHandler;
use Prooph\ServiceBus\Async\MessageProducer;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\Plugin\MessageProducerPlugin;
use Prooph\ServiceBusTest\TestCase;

/**
* Class MessageProducerPluginTest
*
* @package Prooph\ServiceBusTest\Plugin
*/
final class MessageProducerPluginTest extends TestCase
{
/**
* @test
*/
public function it_sets_message_producer_as_message_handler_on_dispatch_initialize()
{
$messageProducer = $this->prophesize(MessageProducer::class);
$commandBus = $this->prophesize(CommandBus::class);
$actionEvent = $this->prophesize(ActionEvent::class);
$actionEventEmitter = $this->prophesize(ActionEventEmitter::class);
$listenerHandler = $this->prophesize(ListenerHandler::class);

$messageProducerPlugin = new MessageProducerPlugin($messageProducer->reveal());

$actionEventEmitter
->attachListener(MessageBus::EVENT_INITIALIZE, [$messageProducerPlugin, 'onDispatchInitialize'])
->willReturn($listenerHandler->reveal())
->shouldBeCalled();

$messageProducerPlugin->attach($actionEventEmitter->reveal());

$actionEvent->getTarget()->willReturn($commandBus->reveal());

$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, $messageProducer->reveal())->shouldBeCalled();

$messageProducerPlugin->onDispatchInitialize($actionEvent->reveal());
}

/**
* @test
*/
public function it_adds_message_producer_as_event_listener_on_dispatch_initialize()
{
$messageProducer = $this->prophesize(MessageProducer::class);
$eventBus = $this->prophesize(EventBus::class);
$actionEvent = $this->prophesize(ActionEvent::class);
$actionEventEmitter = $this->prophesize(ActionEventEmitter::class);
$listenerHandler = $this->prophesize(ListenerHandler::class);

$messageProducerPlugin = new MessageProducerPlugin($messageProducer->reveal());

$actionEventEmitter
->attachListener(MessageBus::EVENT_INITIALIZE, [$messageProducerPlugin, 'onDispatchInitialize'])
->willReturn($listenerHandler->reveal())
->shouldBeCalled();

$messageProducerPlugin->attach($actionEventEmitter->reveal());

$actionEvent->getTarget()->willReturn($eventBus->reveal());

$eventListeners = ['i_am_an_event_listener'];

$actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [])->willReturn($eventListeners);

//Message Producer should be added to list of event listeners
$eventListeners[] = $messageProducer->reveal();

$actionEvent->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, $eventListeners)->shouldBeCalled();

$messageProducerPlugin->onDispatchInitialize($actionEvent->reveal());
}
}