diff --git a/src/Plugin/InvokeStrategy/FinderInvokeStrategy.php b/src/Plugin/InvokeStrategy/FinderInvokeStrategy.php index 673359d..4f7f2bc 100644 --- a/src/Plugin/InvokeStrategy/FinderInvokeStrategy.php +++ b/src/Plugin/InvokeStrategy/FinderInvokeStrategy.php @@ -24,6 +24,10 @@ public function attachToMessageBus(MessageBus $messageBus): void $this->listenerHandlers[] = $messageBus->attach( QueryBus::EVENT_DISPATCH, function (ActionEvent $actionEvent): void { + if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) { + return; + } + $finder = $actionEvent->getParam(QueryBus::EVENT_PARAM_MESSAGE_HANDLER); $query = $actionEvent->getParam(QueryBus::EVENT_PARAM_MESSAGE); diff --git a/src/Plugin/InvokeStrategy/HandleCommandStrategy.php b/src/Plugin/InvokeStrategy/HandleCommandStrategy.php index 13b5bf5..5566aae 100644 --- a/src/Plugin/InvokeStrategy/HandleCommandStrategy.php +++ b/src/Plugin/InvokeStrategy/HandleCommandStrategy.php @@ -23,6 +23,10 @@ public function attachToMessageBus(MessageBus $messageBus): void $this->listenerHandlers[] = $messageBus->attach( MessageBus::EVENT_DISPATCH, function (ActionEvent $actionEvent): void { + if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) { + return; + } + $message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE); $handler = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER); diff --git a/src/Plugin/InvokeStrategy/OnEventStrategy.php b/src/Plugin/InvokeStrategy/OnEventStrategy.php index b069680..b160fdf 100644 --- a/src/Plugin/InvokeStrategy/OnEventStrategy.php +++ b/src/Plugin/InvokeStrategy/OnEventStrategy.php @@ -24,6 +24,10 @@ public function attachToMessageBus(MessageBus $messageBus): void $this->listenerHandlers[] = $messageBus->attach( MessageBus::EVENT_DISPATCH, function (ActionEvent $actionEvent): void { + if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) { + return; + } + $message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE); $handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []); diff --git a/tests/Mock/CustomInvokableMessageHandler.php b/tests/Mock/CustomInvokableMessageHandler.php new file mode 100644 index 0000000..75c77d1 --- /dev/null +++ b/tests/Mock/CustomInvokableMessageHandler.php @@ -0,0 +1,36 @@ + + * (c) 2015-2017 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\ServiceBus\Mock; + +class CustomInvokableMessageHandler +{ + private $lastMessage; + + private $invokeCounter = 0; + + public function __invoke($message): void + { + $this->lastMessage = $message; + $this->invokeCounter++; + } + + public function getLastMessage() + { + return $this->lastMessage; + } + + public function getInvokeCounter(): int + { + return $this->invokeCounter; + } +} diff --git a/tests/Plugin/InvokeStrategy/FinderInvokeStrategyTest.php b/tests/Plugin/InvokeStrategy/FinderInvokeStrategyTest.php index 096b268..e83f26f 100644 --- a/tests/Plugin/InvokeStrategy/FinderInvokeStrategyTest.php +++ b/tests/Plugin/InvokeStrategy/FinderInvokeStrategyTest.php @@ -16,6 +16,7 @@ use Prooph\Common\Event\ActionEvent; use Prooph\ServiceBus\Plugin\InvokeStrategy\FinderInvokeStrategy; use Prooph\ServiceBus\QueryBus; +use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler; use ProophTest\ServiceBus\Mock\Finder; class FinderInvokeStrategyTest extends TestCase @@ -43,4 +44,36 @@ function (ActionEvent $actionEvent) use ($finder): void { $queryBus->dispatch('foo'); $this->assertEquals('foo', $finder->getLastMessage()); } + + /** + * @test + */ + public function it_should_not_handle_already_processed_messages(): void + { + $queryBus = new QueryBus(); + + $finderInvokeStrategy = new FinderInvokeStrategy(); + $finderInvokeStrategy->attachToMessageBus($queryBus); + + $finder = new CustomInvokableMessageHandler(); + + $queryBus->attach( + QueryBus::EVENT_DISPATCH, + function (ActionEvent $actionEvent) use ($finder): void { + $actionEvent->setParam(QueryBus::EVENT_PARAM_MESSAGE_HANDLER, $finder); + }, + QueryBus::PRIORITY_INITIALIZE + ); + + $promise = $queryBus->dispatch('foo'); + + $promise->otherwise(function ($ex) use (&$exception): void { + $exception = $ex; + }); + + $this->assertNull($exception); + + $this->assertEquals('foo', $finder->getLastMessage()); + $this->assertSame(1, $finder->getInvokeCounter()); + } } diff --git a/tests/Plugin/InvokeStrategy/HandleCommandStrategyTest.php b/tests/Plugin/InvokeStrategy/HandleCommandStrategyTest.php index 00d2c6a..508b44c 100644 --- a/tests/Plugin/InvokeStrategy/HandleCommandStrategyTest.php +++ b/tests/Plugin/InvokeStrategy/HandleCommandStrategyTest.php @@ -16,6 +16,7 @@ use Prooph\Common\Event\ActionEvent; use Prooph\ServiceBus\CommandBus; use Prooph\ServiceBus\Plugin\InvokeStrategy\HandleCommandStrategy; +use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler; use ProophTest\ServiceBus\Mock\CustomMessage; use ProophTest\ServiceBus\Mock\CustomMessageCommandHandler; use ProophTest\ServiceBus\Mock\MessageHandler; @@ -46,6 +47,7 @@ function (ActionEvent $event) use ($commandHandler): void { $commandBus->dispatch($doSomething); $this->assertSame($doSomething, $commandHandler->getLastMessage()); + $this->assertSame(1, $commandHandler->getInvokeCounter()); } /** @@ -73,4 +75,30 @@ function (ActionEvent $event) use ($commandHandler): void { $this->assertSame($doSomething, $commandHandler->getLastMessage()); } + + /** + * @test + */ + public function it_should_not_handle_already_processed_messages(): void + { + $commandHandler = new CustomInvokableMessageHandler(); + + $commandBus = new CommandBus(); + $commandBus->attach( + CommandBus::EVENT_DISPATCH, + function (ActionEvent $event) use ($commandHandler): void { + $event->setParam(CommandBus::EVENT_PARAM_MESSAGE_HANDLER, $commandHandler); + }, + CommandBus::PRIORITY_ROUTE + ); + + $handleCommandStrategy = new HandleCommandStrategy(); + $handleCommandStrategy->attachToMessageBus($commandBus); + + $doSomething = new CustomMessage('I am a command'); + + $commandBus->dispatch($doSomething); + + $this->assertSame($doSomething, $commandHandler->getLastMessage()); + } } diff --git a/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php b/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php index e2fa813..84f8064 100644 --- a/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php +++ b/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php @@ -17,6 +17,7 @@ use Prooph\ServiceBus\EventBus; use Prooph\ServiceBus\Plugin\InvokeStrategy\OnEventStrategy; use Prooph\ServiceBus\Plugin\Router\EventRouter; +use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler; use ProophTest\ServiceBus\Mock\CustomMessage; use ProophTest\ServiceBus\Mock\CustomMessageEventHandler; use Prophecy\Argument; @@ -66,4 +67,28 @@ function () { $onEventStrategy->attachToMessageBus($bus->reveal()); } + + /** + * @test + */ + public function it_should_not_handle_already_processed_messages(): void + { + $eventBus = new EventBus(); + + $onEventStrategy = new OnEventStrategy(); + $onEventStrategy->attachToMessageBus($eventBus); + + $callableHandler = new CustomInvokableMessageHandler(); + + $eventRouter = new EventRouter([ + 'ProophTest\ServiceBus\Mock\CustomMessage' => $callableHandler, + ]); + $eventRouter->attachToMessageBus($eventBus); + + $customEvent = new CustomMessage('I am an event'); + $eventBus->dispatch($customEvent); + + $this->assertSame($customEvent, $callableHandler->getLastMessage()); + $this->assertSame(1, $callableHandler->getInvokeCounter()); + } }