diff --git a/docs/async_message_producer.md b/docs/async_message_producer.md index 17bcf7a..1a66cd2 100644 --- a/docs/async_message_producer.md +++ b/docs/async_message_producer.md @@ -3,7 +3,7 @@ Async Message Producer [Back to documentation](../README.md#documentation) -# Usage +# Async Messaging Messaging becomes really interesting when you process your messages asynchronous. For example push your messages on a database queue, set up a cron job to periodically check the queue for new messages and process them. The bus implementations of PSB can @@ -17,10 +17,22 @@ async processing on a per message basis by routing the appropriate messages to a drivers like Doctrine DBAL and Predis (see http://bernardphp.com for a complete list of drivers) - [GuzzleHttpProducer](https://github.com/prooph/psb-http-producer): Send messages to a remote system using HTTP +- [ZeromqProducer](https://github.com/prooph/psb-zeromq-producer): Async message handling using super fast and simple to +set up ZeroMQ + +# Usage + +If you want to set up a bus that handles all messages async you can do so by attaching a [MessageProducerPlugin](plugins.md#messageproducerplugin) +initialized with your message producer of choice to a message bus. + +If you want to decide on a per message basis if the message should be handled async you can use a normal [message router](plugins.md#routers) +and configure your message producer of choice as message handler for the appropriate messages. + +*Note: The [RegexRouter](plugins.md#proophservicebuspluginrouterregexrouter) is a good choice if you want to handle all messages of a specific namespace async.* # QueryBus A async message producer for the QueryBus needs to provide a response by resolving the handed over deferred. -In a messaging system based on RabbitMQ for example you can make use of a callback queue feature. +In a messaging system based on ZeroMQ for example you can make use of request/response mode. HTTP APIs provide responses naturally. So these are both good candidates to use for remote querying. diff --git a/docs/plugins.md b/docs/plugins.md index 97a1aa5..f5ccb10 100644 --- a/docs/plugins.md +++ b/docs/plugins.md @@ -146,3 +146,26 @@ $commandBus->utilize($router); With this technique you can configure the routing for all your messages without the need to create all message handlers on every request. Only the responsible message handlers are lazy loaded by the service locator plugin. + +# MessageProducerPlugin + +If you want to route all messages to an [async message producer](async_message_producer.md) you can attach +this plugin to a message bus. If it is attached to a command or query bus all messages will only be routed to +the message producer. If it is attached to an event bus the message producer +will be added to the list of event listeners. + +```php +//Let's say the zeromq message producer is available as a service in a container +/** @var \Prooph\ServiceBus\Async\MessageProducer $zeromqProducer */ +$zeromqProducer = $container->get('async_event_producer'); + +//We now only need to set up a message producer plugin and let the message bus use it. +$messageProducerPlugin = new \Prooph\ServiceBus\Plugin\MessageProducerPlugin($zeromqProducer); + +$eventBus = new \Prooph\ServiceBus\EvenBus(); + +$eventBus->utilize($messageProducerPlugin); + +//Each event will now be routed to the async message producer +$eventBus->dispatch($domainEvent); +``` \ No newline at end of file diff --git a/src/Plugin/MessageProducerPlugin.php b/src/Plugin/MessageProducerPlugin.php new file mode 100644 index 0000000..97d344d --- /dev/null +++ b/src/Plugin/MessageProducerPlugin.php @@ -0,0 +1,69 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Date: 10/3/15 - 9:04 PM + */ +namespace Prooph\ServiceBus\Plugin; + +use Prooph\Common\Event\ActionEvent; +use Prooph\Common\Event\ActionEventEmitter; +use Prooph\Common\Event\ActionEventListenerAggregate; +use Prooph\Common\Event\DetachAggregateHandlers; +use Prooph\ServiceBus\Async\MessageProducer; +use Prooph\ServiceBus\EventBus; +use Prooph\ServiceBus\MessageBus; + +/** + * Class MessageProducerPlugin + * + * If the MessageProducerPlugin is attached to a message bus it routes all messages + * to the Prooph\ServiceBus\Async\MessageProducer it is initialized with. + * + * @package Prooph\ServiceBus\Plugin + */ +final class MessageProducerPlugin implements ActionEventListenerAggregate +{ + use DetachAggregateHandlers; + + /** + * @var MessageProducer + */ + private $messageProducer; + + /** + * @param MessageProducer $messageProducer + */ + public function __construct(MessageProducer $messageProducer) + { + $this->messageProducer = $messageProducer; + } + + /** + * @param ActionEventEmitter $emitter + */ + public function attach(ActionEventEmitter $emitter) + { + $this->trackHandler($emitter->attachListener(MessageBus::EVENT_INITIALIZE, [$this, 'onDispatchInitialize'])); + } + + /** + * @param ActionEvent $event + */ + public function onDispatchInitialize(ActionEvent $event) + { + $bus = $event->getTarget(); + + if ($bus instanceof EventBus) { + $listeners = $event->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []); + $listeners[] = $this->messageProducer; + $event->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, $listeners); + } else { + $event->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, $this->messageProducer); + } + } +} diff --git a/tests/Plugin/MessageProducerPluginTest.php b/tests/Plugin/MessageProducerPluginTest.php new file mode 100644 index 0000000..37161d3 --- /dev/null +++ b/tests/Plugin/MessageProducerPluginTest.php @@ -0,0 +1,90 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Date: 10/3/15 - 9:17 PM + */ +namespace Prooph\ServiceBusTest\Plugin; + +use Prooph\Common\Event\ActionEvent; +use Prooph\Common\Event\ActionEventEmitter; +use Prooph\Common\Event\ListenerHandler; +use Prooph\ServiceBus\Async\MessageProducer; +use Prooph\ServiceBus\CommandBus; +use Prooph\ServiceBus\EventBus; +use Prooph\ServiceBus\MessageBus; +use Prooph\ServiceBus\Plugin\MessageProducerPlugin; +use Prooph\ServiceBusTest\TestCase; + +/** + * Class MessageProducerPluginTest + * + * @package Prooph\ServiceBusTest\Plugin + */ +final class MessageProducerPluginTest extends TestCase +{ + /** + * @test + */ + public function it_sets_message_producer_as_message_handler_on_dispatch_initialize() + { + $messageProducer = $this->prophesize(MessageProducer::class); + $commandBus = $this->prophesize(CommandBus::class); + $actionEvent = $this->prophesize(ActionEvent::class); + $actionEventEmitter = $this->prophesize(ActionEventEmitter::class); + $listenerHandler = $this->prophesize(ListenerHandler::class); + + $messageProducerPlugin = new MessageProducerPlugin($messageProducer->reveal()); + + $actionEventEmitter + ->attachListener(MessageBus::EVENT_INITIALIZE, [$messageProducerPlugin, 'onDispatchInitialize']) + ->willReturn($listenerHandler->reveal()) + ->shouldBeCalled(); + + $messageProducerPlugin->attach($actionEventEmitter->reveal()); + + $actionEvent->getTarget()->willReturn($commandBus->reveal()); + + $actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, $messageProducer->reveal())->shouldBeCalled(); + + $messageProducerPlugin->onDispatchInitialize($actionEvent->reveal()); + } + + /** + * @test + */ + public function it_adds_message_producer_as_event_listener_on_dispatch_initialize() + { + $messageProducer = $this->prophesize(MessageProducer::class); + $eventBus = $this->prophesize(EventBus::class); + $actionEvent = $this->prophesize(ActionEvent::class); + $actionEventEmitter = $this->prophesize(ActionEventEmitter::class); + $listenerHandler = $this->prophesize(ListenerHandler::class); + + $messageProducerPlugin = new MessageProducerPlugin($messageProducer->reveal()); + + $actionEventEmitter + ->attachListener(MessageBus::EVENT_INITIALIZE, [$messageProducerPlugin, 'onDispatchInitialize']) + ->willReturn($listenerHandler->reveal()) + ->shouldBeCalled(); + + $messageProducerPlugin->attach($actionEventEmitter->reveal()); + + $actionEvent->getTarget()->willReturn($eventBus->reveal()); + + $eventListeners = ['i_am_an_event_listener']; + + $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [])->willReturn($eventListeners); + + //Message Producer should be added to list of event listeners + $eventListeners[] = $messageProducer->reveal(); + + $actionEvent->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, $eventListeners)->shouldBeCalled(); + + $messageProducerPlugin->onDispatchInitialize($actionEvent->reveal()); + } +}