diff --git a/.gitignore b/.gitignore index 35e1c90..dcd7967 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /vendor /var /composer.lock -/.php_cs.cache \ No newline at end of file +/.php_cs.cache +/.phpunit.result.cache diff --git a/.phpunit.result.cache b/.phpunit.result.cache deleted file mode 100644 index 1a723ec..0000000 --- a/.phpunit.result.cache +++ /dev/null @@ -1 +0,0 @@ -{"version":1,"defects":{"Drift\\CommandBus\\Tests\\Bus\\QueryHandlerTest::testQueryBus":4,"Drift\\CommandBus\\Tests\\Bus\\QueryHandlerTest::testBusesInjection":5,"Drift\\CommandBus\\Tests\\Bus\\CommandHandlerTest::testBusesInjection":4,"Drift\\CommandBus\\Tests\\Bus\\AsyncCommandHandlerTest::testQueryBus":4},"times":{"Drift\\CommandBus\\Tests\\Bus\\DiscriminableBusTest::testQueryBus":0.003,"Drift\\CommandBus\\Tests\\Bus\\QueryHandlerTest::testQueryBus":0.002,"Drift\\CommandBus\\Tests\\Bus\\QueryHandlerTest::testBadCommand":0,"Drift\\CommandBus\\Tests\\Bus\\QueryHandlerTest::testBusesInjection":0,"Drift\\CommandBus\\Tests\\Bus\\CommandHandlerTest::testQueryBus":0.002,"Drift\\CommandBus\\Tests\\Bus\\CommandHandlerTest::testBusesInjection":0.001,"Drift\\CommandBus\\Tests\\Bus\\AsyncCommandHandlerTest::testQueryBus":0,"Drift\\CommandBus\\Tests\\Bus\\AsyncCommandHandlerTest::testBusesInjection":0.002}} \ No newline at end of file diff --git a/Async/AMQPAdapter.php b/Async/AMQPAdapter.php index 63e4398..2e24137 100644 --- a/Async/AMQPAdapter.php +++ b/Async/AMQPAdapter.php @@ -19,6 +19,7 @@ use Bunny\Exception\ClientException; use Bunny\Message; use Drift\CommandBus\Bus\CommandBus; +use Drift\CommandBus\Bus\NonRecoverableCommand; use Drift\CommandBus\Console\CommandBusHeaderMessage; use Drift\CommandBus\Console\CommandBusLineMessage; use Drift\CommandBus\Exception\InvalidCommandException; @@ -179,16 +180,20 @@ public function consume( return $this ->channel ->consume(function (Message $message, Channel $channel) use ($bus, $outputPrinter, &$forced) { + $command = unserialize($message->content); + return $this ->executeCommand( $bus, - unserialize($message->content), + $command, $outputPrinter, - function () use ($message, $channel) { + function () use ($channel, $message) { return $channel->ack($message); }, - function () use ($message, $channel) { - return $channel->nack($message); + function () use ($command, $channel, $message) { + return $command instanceof NonRecoverableCommand + ? $channel->ack($message) + : $channel->nack($message); }, function () use (&$forced) { $forced = true; diff --git a/Async/InMemoryAdapter.php b/Async/InMemoryAdapter.php index 9eca1a9..e3aa13a 100644 --- a/Async/InMemoryAdapter.php +++ b/Async/InMemoryAdapter.php @@ -17,6 +17,7 @@ use function Clue\React\Block\await; use Drift\CommandBus\Bus\CommandBus; +use Drift\CommandBus\Bus\NonRecoverableCommand; use Drift\CommandBus\Console\CommandBusLineMessage; use Drift\CommandBus\Exception\InvalidCommandException; use Drift\Console\OutputPrinter; @@ -140,13 +141,20 @@ public function consume( $this->resetIterations($limit); foreach ($this->queue as $key => $command) { + $notShouldRecover = !$command instanceof NonRecoverableCommand; + if ($notShouldRecover) { + unset($this->queue[$key]); + } + $promise = $this ->executeCommand( $bus, $command, $outputPrinter, - function () use ($key) { - unset($this->queue[$key]); + function () use ($key, $notShouldRecover) { + if (!$notShouldRecover) { + unset($this->queue[$key]); + } }, function () {}, function () { diff --git a/Async/PostgreSQLAdapter.php b/Async/PostgreSQLAdapter.php index b0f5d6f..f690489 100644 --- a/Async/PostgreSQLAdapter.php +++ b/Async/PostgreSQLAdapter.php @@ -17,6 +17,7 @@ use function Clue\React\Block\await; use Drift\CommandBus\Bus\CommandBus; +use Drift\CommandBus\Bus\NonRecoverableCommand; use Drift\CommandBus\Console\CommandBusHeaderMessage; use Drift\CommandBus\Console\CommandBusLineMessage; use Drift\CommandBus\Exception\InvalidCommandException; @@ -233,12 +234,19 @@ public function consume( return true; } + $command = unserialize(stripslashes($message['payload'])); + return $this->executeCommand( $bus, - unserialize(stripslashes($message['payload'])), + $command, $outputPrinter, function () {}, - function () {}, + function () use ($command) { + $shouldRecover = !$command instanceof NonRecoverableCommand; + if ($shouldRecover) { + return $this->enqueue($command); + } + }, function () use (&$forced) { $this ->loop @@ -279,13 +287,19 @@ private function consumeAvailableElements( return true; } + $command = unserialize(stripslashes($message['payload'])); + return $this->executeCommand( $bus, - unserialize(stripslashes($message['payload'])), + $command, $outputPrinter, function () { }, - function () { + function () use ($command) { + $shouldRecover = !$command instanceof NonRecoverableCommand; + if ($shouldRecover) { + return $this->enqueue($command); + } }, function () { return false; diff --git a/Async/RedisAdapter.php b/Async/RedisAdapter.php index d874f39..442c654 100644 --- a/Async/RedisAdapter.php +++ b/Async/RedisAdapter.php @@ -18,6 +18,7 @@ use function Clue\React\Block\await; use Clue\React\Redis\Client; use Drift\CommandBus\Bus\CommandBus; +use Drift\CommandBus\Bus\NonRecoverableCommand; use Drift\CommandBus\Console\CommandBusLineMessage; use Drift\CommandBus\Exception\InvalidCommandException; use Drift\Console\OutputPrinter; @@ -148,12 +149,19 @@ public function consume( ->redis ->blPop($this->key, 0) ->then(function (array $job) use ($bus, $outputPrinter) { + $command = unserialize($job[1]); + return $this->executeCommand( $bus, - unserialize($job[1]), + $command, $outputPrinter, function () {}, - function () {}, + function () use ($command) { + $shouldRecover = !$command instanceof NonRecoverableCommand; + if ($shouldRecover) { + return $this->enqueue($command); + } + }, function () { return true; } diff --git a/Bus/CommandBus.php b/Bus/CommandBus.php index 48a69e8..6393880 100644 --- a/Bus/CommandBus.php +++ b/Bus/CommandBus.php @@ -17,6 +17,7 @@ use Drift\CommandBus\Exception\InvalidCommandException; use React\Promise\PromiseInterface; +use function React\Promise\reject; /** * Class CommandBus. @@ -34,10 +35,14 @@ class CommandBus extends Bus */ public function execute($command): PromiseInterface { - return $this - ->handle($command) - ->then(function () { - return; - }); + try { + return $this + ->handle($command) + ->then(function () { + return; + }); + } catch (\Exception $exception) { + return reject($exception); + } } } diff --git a/Bus/NonRecoverableCommand.php b/Bus/NonRecoverableCommand.php new file mode 100644 index 0000000..90d11d8 --- /dev/null +++ b/Bus/NonRecoverableCommand.php @@ -0,0 +1,20 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Bus; + +interface NonRecoverableCommand +{ +} diff --git a/Bus/QueryBus.php b/Bus/QueryBus.php index d70437f..f1956ba 100644 --- a/Bus/QueryBus.php +++ b/Bus/QueryBus.php @@ -17,6 +17,7 @@ use Drift\CommandBus\Exception\InvalidCommandException; use React\Promise\PromiseInterface; +use function React\Promise\reject; /** * Class QueryBus. @@ -34,6 +35,10 @@ class QueryBus extends Bus */ public function ask($query): PromiseInterface { - return $this->handle($query); + try { + return $this->handle($query); + } catch (\Exception $exception) { + return reject($exception); + } } } diff --git a/Console/CommandConsumerCommand.php b/Console/CommandConsumerCommand.php index 82f7e01..e5b910d 100644 --- a/Console/CommandConsumerCommand.php +++ b/Console/CommandConsumerCommand.php @@ -29,15 +29,8 @@ */ class CommandConsumerCommand extends Command { - /** - * @var AsyncAdapter - */ - private $asyncAdapter; - - /** - * @var InlineCommandBus - */ - private $commandBus; + private AsyncAdapter $asyncAdapter; + private InlineCommandBus $commandBus; /** * ConsumeCommand constructor. diff --git a/Tests/Async/AsyncAdapterTest.php b/Tests/Async/AsyncAdapterTest.php index fe37604..b6c5c80 100644 --- a/Tests/Async/AsyncAdapterTest.php +++ b/Tests/Async/AsyncAdapterTest.php @@ -22,9 +22,15 @@ use Drift\CommandBus\Tests\Command\ChangeAThing; use Drift\CommandBus\Tests\Command\ChangeBThing; use Drift\CommandBus\Tests\Command\ChangeYetAnotherThing; +use Drift\CommandBus\Tests\Command\NotRecoverableCommand; +use Drift\CommandBus\Tests\Command\RejectException; +use Drift\CommandBus\Tests\Command\ThrowException; use Drift\CommandBus\Tests\CommandHandler\ChangeAnotherThingHandler; use Drift\CommandBus\Tests\CommandHandler\ChangeAThingHandler; use Drift\CommandBus\Tests\CommandHandler\ChangeYetAnotherThingHandler; +use Drift\CommandBus\Tests\CommandHandler\NotRecoverableCommandHandler; +use Drift\CommandBus\Tests\CommandHandler\RejectExceptionHandler; +use Drift\CommandBus\Tests\CommandHandler\ThrowExceptionHandler; use Drift\CommandBus\Tests\Context; use Drift\CommandBus\Tests\Middleware\Middleware1; @@ -62,6 +68,24 @@ protected static function decorateConfiguration(array $configuration): array ], ]; + $configuration['services'][ThrowExceptionHandler::class] = [ + 'tags' => [ + ['name' => 'command_handler', 'method' => 'handle'], + ], + ]; + + $configuration['services'][RejectExceptionHandler::class] = [ + 'tags' => [ + ['name' => 'command_handler', 'method' => 'handle'], + ], + ]; + + $configuration['services'][NotRecoverableCommandHandler::class] = [ + 'tags' => [ + ['name' => 'command_handler', 'method' => 'handle'], + ], + ]; + $configuration['imports'] = [ ['resource' => __DIR__.'/../autowiring.yml'], ]; @@ -192,14 +216,14 @@ public function testAsyncCommands() 'command-bus:consume-commands', ]); - usleep(200000); + usleep(500000); $promise1 = $this ->getCommandBus() ->execute(new ChangeAThing('thing')); await($promise1, $this->getLoop()); - usleep(200000); + usleep(500000); $promises = []; $promises[] = $this @@ -211,7 +235,7 @@ public function testAsyncCommands() ->execute(new ChangeYetAnotherThing('thing')); awaitAll($promises, $this->getLoop()); - usleep(200000); + usleep(500000); $output = $process->getOutput(); $this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeAThing", $output); $this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeAnotherThing", $output); @@ -219,6 +243,70 @@ public function testAsyncCommands() $process->stop(); } + public function testThrowException() + { + $this->resetInfrastructure(); + + $process = $this->runAsyncCommand([ + 'command-bus:consume-commands', + ]); + + usleep(500000); + + $promise = $this + ->getCommandBus() + ->execute(new ThrowException()); + + await($promise, $this->getLoop()); + usleep(500000); + $output = $process->getOutput(); + $this->assertTrue(substr_count($output, 'Rejected') > 1); + $process->stop(); + } + + public function testReturnRejectedPromise() + { + $this->resetInfrastructure(); + + $process = $this->runAsyncCommand([ + 'command-bus:consume-commands', + ]); + + usleep(500000); + + $promise = $this + ->getCommandBus() + ->execute(new RejectException()); + + await($promise, $this->getLoop()); + usleep(500000); + $output = $process->getOutput(); + $this->assertTrue(substr_count($output, 'Rejected') > 1); + $process->stop(); + } + + public function testNoRecoverableCommand() + { + $this->resetInfrastructure(); + + $process = $this->runAsyncCommand([ + 'command-bus:consume-commands', + ]); + + usleep(500000); + + $promise = $this + ->getCommandBus() + ->execute(new NotRecoverableCommand()); + + await($promise, $this->getLoop()); + usleep(500000); + $output = $process->getOutput(); + $this->assertTrue(1 === substr_count($output, 'Rejected')); + $this->assertStringContainsString("\033[01;31mRejected\033[0m NotRecoverableCommand", $output); + $process->stop(); + } + /** * Reset infrastructure. * @@ -274,7 +362,10 @@ protected function consumeCommands(int $limit): string usleep(100000); } - return $process->getOutput(); + $output = $process->getOutput(); + $process->stop(); + + return $output; } /** diff --git a/Tests/Async/AsyncTest.php b/Tests/Async/AsyncTest.php index 2e21433..5267988 100644 --- a/Tests/Async/AsyncTest.php +++ b/Tests/Async/AsyncTest.php @@ -68,8 +68,6 @@ protected static function decorateConfiguration(array $configuration): array /** * Test buses are being built. - * - * @group async */ public function testCommandBus() { diff --git a/Tests/Async/DefaultAsyncTest.php b/Tests/Async/DefaultAsyncTest.php index 075ef84..e7d6410 100644 --- a/Tests/Async/DefaultAsyncTest.php +++ b/Tests/Async/DefaultAsyncTest.php @@ -59,8 +59,6 @@ protected static function decorateConfiguration(array $configuration): array /** * Test buses are being built. - * - * @group async */ public function testCommandBusMiddlewares() { diff --git a/Tests/Async/InMemoryAsyncTest.php b/Tests/Async/InMemoryAsyncTest.php index 5d0f683..ce7d391 100644 --- a/Tests/Async/InMemoryAsyncTest.php +++ b/Tests/Async/InMemoryAsyncTest.php @@ -49,6 +49,21 @@ public function testAsyncCommands() $this->markTestSkipped('InMemory adapter should not fire this test'); } + public function testThrowException() + { + $this->markTestSkipped('InMemory adapter should not fire this test'); + } + + public function testReturnRejectedPromise() + { + $this->markTestSkipped('InMemory adapter should not fire this test'); + } + + public function testNoRecoverableCommand() + { + $this->markTestSkipped('InMemory adapter should not fire this test'); + } + /** * Consume commands. * diff --git a/Tests/Command/NotRecoverableCommand.php b/Tests/Command/NotRecoverableCommand.php new file mode 100644 index 0000000..45e01a4 --- /dev/null +++ b/Tests/Command/NotRecoverableCommand.php @@ -0,0 +1,22 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Tests\Command; + +use Drift\CommandBus\Bus\NonRecoverableCommand; + +class NotRecoverableCommand implements NonRecoverableCommand +{ +} diff --git a/Tests/Command/RejectException.php b/Tests/Command/RejectException.php new file mode 100644 index 0000000..6a8a9a9 --- /dev/null +++ b/Tests/Command/RejectException.php @@ -0,0 +1,20 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Tests\Command; + +class RejectException +{ +} diff --git a/Tests/Command/ThrowException.php b/Tests/Command/ThrowException.php new file mode 100644 index 0000000..8b57b9d --- /dev/null +++ b/Tests/Command/ThrowException.php @@ -0,0 +1,20 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Tests\Command; + +class ThrowException +{ +} diff --git a/Tests/CommandHandler/NotRecoverableCommandHandler.php b/Tests/CommandHandler/NotRecoverableCommandHandler.php new file mode 100644 index 0000000..bb13182 --- /dev/null +++ b/Tests/CommandHandler/NotRecoverableCommandHandler.php @@ -0,0 +1,31 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Tests\CommandHandler; + +use Drift\CommandBus\Tests\Command\NotRecoverableCommand; + +class NotRecoverableCommandHandler +{ + /** + * @param NotRecoverableCommand $notRecoverableCommand + * + * @throws \Exception + */ + public function handle(NotRecoverableCommand $notRecoverableCommand) + { + throw new \Exception(); + } +} diff --git a/Tests/CommandHandler/RejectExceptionHandler.php b/Tests/CommandHandler/RejectExceptionHandler.php new file mode 100644 index 0000000..98142e3 --- /dev/null +++ b/Tests/CommandHandler/RejectExceptionHandler.php @@ -0,0 +1,33 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Tests\CommandHandler; + +use Drift\CommandBus\Tests\Command\RejectException; +use function React\Promise\reject; + +/** + * Class RejectExceptionHandler. + */ +class RejectExceptionHandler +{ + /** + * Handle. + */ + public function handle(RejectException $rejectException) + { + return reject(new \Exception()); + } +} diff --git a/Tests/CommandHandler/ThrowExceptionHandler.php b/Tests/CommandHandler/ThrowExceptionHandler.php new file mode 100644 index 0000000..5c5875f --- /dev/null +++ b/Tests/CommandHandler/ThrowExceptionHandler.php @@ -0,0 +1,32 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Tests\CommandHandler; + +use Drift\CommandBus\Tests\Command\ThrowException; + +/** + * Class ThrowExceptionHandler. + */ +class ThrowExceptionHandler +{ + /** + * Handle. + */ + public function handle(ThrowException $throwException) + { + throw new \Exception(); + } +}