diff --git a/src/Plugin/Router/AsyncSwitchMessageRouter.php b/src/Plugin/Router/AsyncSwitchMessageRouter.php index 290bc82..d37ec27 100644 --- a/src/Plugin/Router/AsyncSwitchMessageRouter.php +++ b/src/Plugin/Router/AsyncSwitchMessageRouter.php @@ -18,6 +18,7 @@ use Prooph\ServiceBus\Async\MessageProducer; use Prooph\ServiceBus\CommandBus; use Prooph\ServiceBus\EventBus; +use Prooph\ServiceBus\Exception\RuntimeException; use Prooph\ServiceBus\MessageBus; use Prooph\ServiceBus\Plugin\AbstractPlugin; use Prooph\ServiceBus\QueryBus; @@ -71,9 +72,11 @@ public function onRouteMessage(ActionEvent $actionEvent): void if ($actionEvent->getTarget() instanceof CommandBus || $actionEvent->getTarget() instanceof QueryBus) { $actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, $this->asyncMessageProducer); - } else { + } elseif ($actionEvent->getTarget() instanceof EventBus) { //Target is an event bus so we set message producer as the only listener of the message $actionEvent->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$this->asyncMessageProducer]); + } else { + throw new RuntimeException('Unexpected bus implementation. This plugin is only compatible with standard CommandBus, QueryBus and EventBus implementations.'); } return; diff --git a/tests/Plugin/Router/AsyncSwitchMessageRouterTest.php b/tests/Plugin/Router/AsyncSwitchMessageRouterTest.php index c66c92a..93fc063 100644 --- a/tests/Plugin/Router/AsyncSwitchMessageRouterTest.php +++ b/tests/Plugin/Router/AsyncSwitchMessageRouterTest.php @@ -18,6 +18,7 @@ use Prooph\ServiceBus\Async\MessageProducer; use Prooph\ServiceBus\CommandBus; use Prooph\ServiceBus\EventBus; +use Prooph\ServiceBus\Exception\RuntimeException; use Prooph\ServiceBus\MessageBus; use Prooph\ServiceBus\Plugin\Router\AsyncSwitchMessageRouter; use Prooph\ServiceBus\Plugin\Router\EventRouter; @@ -196,4 +197,34 @@ public function it_sets_message_producer_as_event_listener_if_target_is_an_event $this->assertArrayHasKey('handled-async', $updatedMessage->metadata()); $this->assertTrue($updatedMessage->metadata()['handled-async']); } + + /** + * @test + */ + public function it_throws_exception_if_target_is_unknown_bus(): void + { + $messageProducer = $this->prophesize(MessageProducer::class); + + $message = AsyncEvent::createEvent('test-data'); + + $actionEvent = new DefaultActionEvent( + MessageBus::EVENT_DISPATCH, + $this->prophesize(MessageBus::class), + [ + MessageBus::EVENT_PARAM_MESSAGE_NAME => get_class($message), + MessageBus::EVENT_PARAM_MESSAGE => $message, + ] + ); + + $router = new AsyncSwitchMessageRouter(new EventRouter(), $messageProducer->reveal()); + try { + $router->onRouteMessage($actionEvent); + $this->fail(); + } catch (RuntimeException $exception) { + } + + $updatedMessage = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE); + $this->assertArrayHasKey('handled-async', $updatedMessage->metadata()); + $this->assertTrue($updatedMessage->metadata()['handled-async']); + } }