Skip to content

Commit

Permalink
Do not process already process messages with a custom strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
lunetics committed Apr 24, 2017
1 parent 330f200 commit afa40e8
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/Plugin/InvokeStrategy/FinderInvokeStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ function (ActionEvent $actionEvent): void {

$deferred = $actionEvent->getParam(QueryBus::EVENT_PARAM_DEFERRED);

if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) {
return;
}

if (is_object($finder)) {
$finder->find($query, $deferred);
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);
Expand Down
4 changes: 4 additions & 0 deletions src/Plugin/InvokeStrategy/HandleCommandStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ function (ActionEvent $actionEvent): void {
$message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$handler = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER);

if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) {
return;
}

$handler->handle($message);
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);
},
Expand Down
4 changes: 4 additions & 0 deletions src/Plugin/InvokeStrategy/OnEventStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ function (ActionEvent $actionEvent): void {
$message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []);

if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) {
return;
}

foreach ($handlers as $handler) {
$handler->onEvent($message);
}
Expand Down
36 changes: 36 additions & 0 deletions tests/Mock/CustomInvokableMessageHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\ServiceBus\Mock;

class CustomInvokableMessageHandler
{
private $lastMessage;

private $invokeCounter = 0;

public function __invoke($message): void
{
$this->lastMessage = $message;
$this->invokeCounter++;
}

public function getLastMessage()
{
return $this->lastMessage;
}

public function getInvokeCounter(): int
{
return $this->invokeCounter;
}
}
33 changes: 33 additions & 0 deletions tests/Plugin/InvokeStrategy/FinderInvokeStrategyTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Prooph\Common\Event\ActionEvent;
use Prooph\ServiceBus\Plugin\InvokeStrategy\FinderInvokeStrategy;
use Prooph\ServiceBus\QueryBus;
use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler;
use ProophTest\ServiceBus\Mock\Finder;

class FinderInvokeStrategyTest extends TestCase
Expand Down Expand Up @@ -43,4 +44,36 @@ function (ActionEvent $actionEvent) use ($finder): void {
$queryBus->dispatch('foo');
$this->assertEquals('foo', $finder->getLastMessage());
}

/**
* @test
*/
public function it_should_not_handle_already_processed_messages(): void
{
$queryBus = new QueryBus();

$finderInvokeStrategy = new FinderInvokeStrategy();
$finderInvokeStrategy->attachToMessageBus($queryBus);

$finder = new CustomInvokableMessageHandler();

$queryBus->attach(
QueryBus::EVENT_DISPATCH,
function (ActionEvent $actionEvent) use ($finder): void {
$actionEvent->setParam(QueryBus::EVENT_PARAM_MESSAGE_HANDLER, $finder);
},
QueryBus::PRIORITY_INITIALIZE
);

$promise = $queryBus->dispatch('foo');

$promise->otherwise(function ($ex) use (&$exception): void {
$exception = $ex;
});

$this->assertNull($exception);

$this->assertEquals('foo', $finder->getLastMessage());
$this->assertSame(1, $finder->getInvokeCounter());
}
}
28 changes: 28 additions & 0 deletions tests/Plugin/InvokeStrategy/HandleCommandStrategyTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Prooph\Common\Event\ActionEvent;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\Plugin\InvokeStrategy\HandleCommandStrategy;
use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler;
use ProophTest\ServiceBus\Mock\CustomMessage;
use ProophTest\ServiceBus\Mock\CustomMessageCommandHandler;
use ProophTest\ServiceBus\Mock\MessageHandler;
Expand Down Expand Up @@ -46,6 +47,7 @@ function (ActionEvent $event) use ($commandHandler): void {
$commandBus->dispatch($doSomething);

$this->assertSame($doSomething, $commandHandler->getLastMessage());
$this->assertSame(1, $commandHandler->getInvokeCounter());
}

/**
Expand Down Expand Up @@ -73,4 +75,30 @@ function (ActionEvent $event) use ($commandHandler): void {

$this->assertSame($doSomething, $commandHandler->getLastMessage());
}

/**
* @test
*/
public function it_should_not_handle_already_processed_messages(): void
{
$commandHandler = new CustomInvokableMessageHandler();

$commandBus = new CommandBus();
$commandBus->attach(
CommandBus::EVENT_DISPATCH,
function (ActionEvent $event) use ($commandHandler): void {
$event->setParam(CommandBus::EVENT_PARAM_MESSAGE_HANDLER, $commandHandler);
},
CommandBus::PRIORITY_ROUTE
);

$handleCommandStrategy = new HandleCommandStrategy();
$handleCommandStrategy->attachToMessageBus($commandBus);

$doSomething = new CustomMessage('I am a command');

$commandBus->dispatch($doSomething);

$this->assertSame($doSomething, $commandHandler->getLastMessage());
}
}
25 changes: 25 additions & 0 deletions tests/Plugin/InvokeStrategy/OnEventStrategyTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Plugin\InvokeStrategy\OnEventStrategy;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler;
use ProophTest\ServiceBus\Mock\CustomMessage;
use ProophTest\ServiceBus\Mock\CustomMessageEventHandler;
use Prophecy\Argument;
Expand Down Expand Up @@ -66,4 +67,28 @@ function () {

$onEventStrategy->attachToMessageBus($bus->reveal());
}

/**
* @test
*/
public function it_should_not_handle_already_processed_messages(): void
{
$eventBus = new EventBus();

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$callableHandler = new CustomInvokableMessageHandler();

$eventRouter = new EventRouter([
'ProophTest\ServiceBus\Mock\CustomMessage' => $callableHandler,
]);
$eventRouter->attachToMessageBus($eventBus);

$customEvent = new CustomMessage('I am an event');
$eventBus->dispatch($customEvent);

$this->assertSame($customEvent, $callableHandler->getLastMessage());
$this->assertSame(1, $callableHandler->getInvokeCounter());
}
}

0 comments on commit afa40e8

Please sign in to comment.