diff --git a/src/EventBus.php b/src/EventBus.php index 884c1dd..ca0637c 100644 --- a/src/EventBus.php +++ b/src/EventBus.php @@ -30,6 +30,11 @@ class EventBus extends MessageBus */ protected $collectExceptions = false; + /** + * @var array + */ + protected $collectedExceptions = []; + public function __construct(ActionEventEmitter $actionEventEmitter = null) { parent::__construct($actionEventEmitter); @@ -58,12 +63,29 @@ function (ActionEvent $actionEvent): void { $actionEvent->setParam(self::EVENT_PARAM_MESSAGE_HANDLED, true); } - if (count($caughtExceptions)) { - throw EventListenerException::collected(...$caughtExceptions); + foreach ($caughtExceptions as $ex) { + $this->collectedExceptions[] = $ex; } }, self::PRIORITY_INVOKE_HANDLER ); + + $this->events->attachListener( + self::EVENT_FINALIZE, + function (ActionEvent $actionEvent): void { + $target = $actionEvent->getTarget(); + + if (empty($target->collectedExceptions)) { + return; + } + + $exceptions = $target->collectedExceptions; + $target->collectedExceptions = []; + + $actionEvent->setParam(MessageBus::EVENT_PARAM_EXCEPTION, EventListenerException::collected(...$exceptions)); + }, + 1000 + ); } /** @@ -99,4 +121,14 @@ public function disableCollectExceptions(): void { $this->collectExceptions = false; } + + public function isCollectingExceptions(): bool + { + return $this->collectExceptions; + } + + public function addCollectedException(\Throwable $e): void + { + $this->collectedExceptions[] = $e; + } } diff --git a/src/Plugin/InvokeStrategy/OnEventStrategy.php b/src/Plugin/InvokeStrategy/OnEventStrategy.php index b160fdf..0f88294 100644 --- a/src/Plugin/InvokeStrategy/OnEventStrategy.php +++ b/src/Plugin/InvokeStrategy/OnEventStrategy.php @@ -24,15 +24,24 @@ public function attachToMessageBus(MessageBus $messageBus): void $this->listenerHandlers[] = $messageBus->attach( MessageBus::EVENT_DISPATCH, function (ActionEvent $actionEvent): void { - if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) { - return; - } - + $target = $actionEvent->getTarget(); $message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE); $handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []); foreach ($handlers as $handler) { - $handler->onEvent($message); + if (is_callable($handler) || ! is_object($handler) || ! is_callable([$handler, 'onEvent'])) { + continue; + } + + try { + $handler->onEvent($message); + } catch (\Throwable $e) { + if ($target->isCollectingExceptions()) { + $target->addCollectedException($e); + } else { + throw $e; + } + } } $actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true); diff --git a/tests/EventBusTest.php b/tests/EventBusTest.php index 9492b5f..2478fca 100644 --- a/tests/EventBusTest.php +++ b/tests/EventBusTest.php @@ -273,7 +273,7 @@ public function it_collects_exceptions_if_mode_is_enabled(): void MessageBus::EVENT_DISPATCH, function (ActionEvent $e) use ($handler, $errorProducer): void { if ($e->getParam(MessageBus::EVENT_PARAM_MESSAGE_NAME) === CustomMessage::class) { - $e->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$handler, $errorProducer, $handler]); + $e->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$handler, $errorProducer, $handler]); } }, MessageBus::PRIORITY_ROUTE diff --git a/tests/Mock/CustomMessageEventHandler2.php b/tests/Mock/CustomMessageEventHandler2.php new file mode 100644 index 0000000..76b5238 --- /dev/null +++ b/tests/Mock/CustomMessageEventHandler2.php @@ -0,0 +1,36 @@ + + * (c) 2015-2017 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\ServiceBus\Mock; + +final class CustomMessageEventHandler2 +{ + private $lastMessage; + + private $invokeCounter = 0; + + public function on($message): void + { + $this->lastMessage = $message; + $this->invokeCounter++; + } + + public function getLastMessage() + { + return $this->lastMessage; + } + + public function getInvokeCounter(): int + { + return $this->invokeCounter; + } +} diff --git a/tests/Mock/CustomMessageEventHandlerThrowingExceptions.php b/tests/Mock/CustomMessageEventHandlerThrowingExceptions.php new file mode 100644 index 0000000..e74f761 --- /dev/null +++ b/tests/Mock/CustomMessageEventHandlerThrowingExceptions.php @@ -0,0 +1,21 @@ + + * (c) 2015-2017 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\ServiceBus\Mock; + +final class CustomMessageEventHandlerThrowingExceptions +{ + public function onEvent($message): void + { + throw new \Exception('bar'); + } +} diff --git a/tests/Mock/CustomOnEventStrategy.php b/tests/Mock/CustomOnEventStrategy.php new file mode 100644 index 0000000..6449adf --- /dev/null +++ b/tests/Mock/CustomOnEventStrategy.php @@ -0,0 +1,52 @@ + + * (c) 2015-2017 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\ServiceBus\Mock; + +use Prooph\Common\Event\ActionEvent; +use Prooph\ServiceBus\EventBus; +use Prooph\ServiceBus\MessageBus; +use Prooph\ServiceBus\Plugin\AbstractPlugin; + +final class CustomOnEventStrategy extends AbstractPlugin +{ + public function attachToMessageBus(MessageBus $messageBus): void + { + $this->listenerHandlers[] = $messageBus->attach( + MessageBus::EVENT_DISPATCH, + function (ActionEvent $actionEvent): void { + $target = $actionEvent->getTarget(); + $message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE); + $handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []); + + foreach ($handlers as $handler) { + if (is_callable($handler) || ! is_object($handler) || ! is_callable([$handler, 'on'])) { + continue; + } + + try { + $handler->on($message); + } catch (\Throwable $e) { + if ($target->isCollectingExceptions()) { + $target->addCollectedException($e); + } else { + throw $e; + } + } + } + + $actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true); + }, + MessageBus::PRIORITY_INVOKE_HANDLER + ); + } +} diff --git a/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php b/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php index 84f8064..b4e3c57 100644 --- a/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php +++ b/tests/Plugin/InvokeStrategy/OnEventStrategyTest.php @@ -15,11 +15,17 @@ use PHPUnit\Framework\TestCase; use Prooph\Common\Event\DefaultListenerHandler; use Prooph\ServiceBus\EventBus; +use Prooph\ServiceBus\Exception\EventListenerException; +use Prooph\ServiceBus\Exception\MessageDispatchException; use Prooph\ServiceBus\Plugin\InvokeStrategy\OnEventStrategy; +use Prooph\ServiceBus\Plugin\ListenerExceptionCollectionMode; use Prooph\ServiceBus\Plugin\Router\EventRouter; use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler; use ProophTest\ServiceBus\Mock\CustomMessage; use ProophTest\ServiceBus\Mock\CustomMessageEventHandler; +use ProophTest\ServiceBus\Mock\CustomMessageEventHandler2; +use ProophTest\ServiceBus\Mock\CustomMessageEventHandlerThrowingExceptions; +use ProophTest\ServiceBus\Mock\CustomOnEventStrategy; use Prophecy\Argument; class OnEventStrategyTest extends TestCase @@ -60,7 +66,7 @@ public function it_can_be_attached_to_event_bus(): void ->shouldBeCalled() ->willReturn( new DefaultListenerHandler( - function () { + function (): void { } ) ); @@ -91,4 +97,143 @@ public function it_should_not_handle_already_processed_messages(): void $this->assertSame($customEvent, $callableHandler->getLastMessage()); $this->assertSame(1, $callableHandler->getInvokeCounter()); } + + /** + * @test + */ + public function it_should_still_work_with_callables(): void + { + $eventBus = new EventBus(); + + $onEventStrategy = new OnEventStrategy(); + $onEventStrategy->attachToMessageBus($eventBus); + + $handler = new CustomMessageEventHandler(); + + $result = false; + + $router = new EventRouter(); + $router->route(CustomMessage::class) + ->to(function (CustomMessage $message) use (&$result): void { + $result = true; + }) + ->andTo($handler); + + $router->attachToMessageBus($eventBus); + + $eventBus->dispatch(new CustomMessage('some text')); + + $this->assertTrue($result); + $this->assertSame(1, $handler->getInvokeCounter()); + } + + /** + * @test + */ + public function it_should_still_work_with_callables_and_collect_all_exceptions(): void + { + $eventBus = new EventBus(); + + $exceptionModePlugin = new ListenerExceptionCollectionMode(); + $exceptionModePlugin->attachToMessageBus($eventBus); + + $onEventStrategy = new OnEventStrategy(); + $onEventStrategy->attachToMessageBus($eventBus); + + $handler = new CustomMessageEventHandlerThrowingExceptions(); + + $router = new EventRouter(); + $router->route(CustomMessage::class) + ->to(function (CustomMessage $message): void { + throw new \Exception('foo'); + }) + ->andTo($handler); + + $router->attachToMessageBus($eventBus); + + $ex = null; + + try { + $eventBus->dispatch(new CustomMessage('some text')); + } catch (MessageDispatchException $ex) { + $ex = $ex->getPrevious(); + } + + $this->assertNotNull($ex); + $this->assertInstanceOf(EventListenerException::class, $ex); + $this->assertCount(2, $ex->listenerExceptions()); + } + + /** + * @test + */ + public function it_should_still_work_with_callables_and_collect_all_exceptions_part2(): void + { + $eventBus = new EventBus(); + + $exceptionModePlugin = new ListenerExceptionCollectionMode(); + $exceptionModePlugin->attachToMessageBus($eventBus); + + $onEventStrategy = new OnEventStrategy(); + $onEventStrategy->attachToMessageBus($eventBus); + + $handler = new CustomMessageEventHandlerThrowingExceptions(); + + $router = new EventRouter(); + $router->route(CustomMessage::class) + ->to(function (CustomMessage $message): void { + throw new \Exception('foo'); + }) + ->andTo($handler) + ->andTo($handler); + + $router->attachToMessageBus($eventBus); + + $ex = null; + + try { + $eventBus->dispatch(new CustomMessage('some text')); + } catch (MessageDispatchException $ex) { + $ex = $ex->getPrevious(); + } + + $this->assertNotNull($ex); + $this->assertInstanceOf(EventListenerException::class, $ex); + $this->assertCount(3, $ex->listenerExceptions()); + } + + /** + * @test + */ + public function it_should_still_work_with_callables_and_other_strategies(): void + { + $eventBus = new EventBus(); + + $onEventStrategy = new OnEventStrategy(); + $onEventStrategy->attachToMessageBus($eventBus); + + $secondOnEventStrategy = new CustomOnEventStrategy(); + $secondOnEventStrategy->attachToMessageBus($eventBus); + + $handler = new CustomMessageEventHandler(); + $handler2 = new CustomMessageEventHandler2(); + + $result = false; + + $router = new EventRouter(); + $router->route(CustomMessage::class) + ->to(function (CustomMessage $message) use (&$result): void { + $result = true; + }) + ->andTo($handler) + ->andTo($handler2); + + $router->attachToMessageBus($eventBus); + + $eventBus->dispatch(new CustomMessage('some text')); + + $this->assertTrue($result); + $this->assertSame(1, $handler->getInvokeCounter()); + $this->assertSame(1, $handler2->getInvokeCounter()); + } }