Skip to content

Commit

Permalink
Some changes to command bus
Browse files Browse the repository at this point in the history
- Added interface for non recoverable commands
- Treated properly exceptions (cached)
  • Loading branch information
mmoreram committed Oct 15, 2021
1 parent 79d9826 commit f8c6281
Show file tree
Hide file tree
Showing 20 changed files with 355 additions and 37 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/vendor
/var
/composer.lock
/.php_cs.cache
/.php_cs.cache
/.phpunit.result.cache
1 change: 0 additions & 1 deletion .phpunit.result.cache

This file was deleted.

13 changes: 9 additions & 4 deletions Async/AMQPAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Bunny\Exception\ClientException;
use Bunny\Message;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusHeaderMessage;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
Expand Down Expand Up @@ -179,16 +180,20 @@ public function consume(
return $this
->channel
->consume(function (Message $message, Channel $channel) use ($bus, $outputPrinter, &$forced) {
$command = unserialize($message->content);

return $this
->executeCommand(
$bus,
unserialize($message->content),
$command,
$outputPrinter,
function () use ($message, $channel) {
function () use ($channel, $message) {
return $channel->ack($message);
},
function () use ($message, $channel) {
return $channel->nack($message);
function () use ($command, $channel, $message) {
return $command instanceof NonRecoverableCommand
? $channel->ack($message)
: $channel->nack($message);
},
function () use (&$forced) {
$forced = true;
Expand Down
12 changes: 10 additions & 2 deletions Async/InMemoryAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use function Clue\React\Block\await;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
use Drift\Console\OutputPrinter;
Expand Down Expand Up @@ -140,13 +141,20 @@ public function consume(
$this->resetIterations($limit);

foreach ($this->queue as $key => $command) {
$notShouldRecover = !$command instanceof NonRecoverableCommand;
if ($notShouldRecover) {
unset($this->queue[$key]);
}

$promise = $this
->executeCommand(
$bus,
$command,
$outputPrinter,
function () use ($key) {
unset($this->queue[$key]);
function () use ($key, $notShouldRecover) {
if (!$notShouldRecover) {
unset($this->queue[$key]);
}
},
function () {},
function () {
Expand Down
22 changes: 18 additions & 4 deletions Async/PostgreSQLAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use function Clue\React\Block\await;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusHeaderMessage;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
Expand Down Expand Up @@ -233,12 +234,19 @@ public function consume(
return true;
}

$command = unserialize(stripslashes($message['payload']));

return $this->executeCommand(
$bus,
unserialize(stripslashes($message['payload'])),
$command,
$outputPrinter,
function () {},
function () {},
function () use ($command) {
$shouldRecover = !$command instanceof NonRecoverableCommand;
if ($shouldRecover) {
return $this->enqueue($command);
}
},
function () use (&$forced) {
$this
->loop
Expand Down Expand Up @@ -279,13 +287,19 @@ private function consumeAvailableElements(
return true;
}

$command = unserialize(stripslashes($message['payload']));

return $this->executeCommand(
$bus,
unserialize(stripslashes($message['payload'])),
$command,
$outputPrinter,
function () {
},
function () {
function () use ($command) {
$shouldRecover = !$command instanceof NonRecoverableCommand;
if ($shouldRecover) {
return $this->enqueue($command);
}
},
function () {
return false;
Expand Down
12 changes: 10 additions & 2 deletions Async/RedisAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use function Clue\React\Block\await;
use Clue\React\Redis\Client;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
use Drift\Console\OutputPrinter;
Expand Down Expand Up @@ -148,12 +149,19 @@ public function consume(
->redis
->blPop($this->key, 0)
->then(function (array $job) use ($bus, $outputPrinter) {
$command = unserialize($job[1]);

return $this->executeCommand(
$bus,
unserialize($job[1]),
$command,
$outputPrinter,
function () {},
function () {},
function () use ($command) {
$shouldRecover = !$command instanceof NonRecoverableCommand;
if ($shouldRecover) {
return $this->enqueue($command);
}
},
function () {
return true;
}
Expand Down
15 changes: 10 additions & 5 deletions Bus/CommandBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use Drift\CommandBus\Exception\InvalidCommandException;
use React\Promise\PromiseInterface;
use function React\Promise\reject;

/**
* Class CommandBus.
Expand All @@ -34,10 +35,14 @@ class CommandBus extends Bus
*/
public function execute($command): PromiseInterface
{
return $this
->handle($command)
->then(function () {
return;
});
try {
return $this
->handle($command)
->then(function () {
return;
});
} catch (\Exception $exception) {
return reject($exception);
}
}
}
20 changes: 20 additions & 0 deletions Bus/NonRecoverableCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

/*
* This file is part of the DriftPHP Project
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Feel free to edit as you please, and have fun.
*
* @author Marc Morera <yuhu@mmoreram.com>
*/

declare(strict_types=1);

namespace Drift\CommandBus\Bus;

interface NonRecoverableCommand
{
}
7 changes: 6 additions & 1 deletion Bus/QueryBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use Drift\CommandBus\Exception\InvalidCommandException;
use React\Promise\PromiseInterface;
use function React\Promise\reject;

/**
* Class QueryBus.
Expand All @@ -34,6 +35,10 @@ class QueryBus extends Bus
*/
public function ask($query): PromiseInterface
{
return $this->handle($query);
try {
return $this->handle($query);
} catch (\Exception $exception) {
return reject($exception);
}
}
}
11 changes: 2 additions & 9 deletions Console/CommandConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,8 @@
*/
class CommandConsumerCommand extends Command
{
/**
* @var AsyncAdapter
*/
private $asyncAdapter;

/**
* @var InlineCommandBus
*/
private $commandBus;
private AsyncAdapter $asyncAdapter;
private InlineCommandBus $commandBus;

/**
* ConsumeCommand constructor.
Expand Down
99 changes: 95 additions & 4 deletions Tests/Async/AsyncAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@
use Drift\CommandBus\Tests\Command\ChangeAThing;
use Drift\CommandBus\Tests\Command\ChangeBThing;
use Drift\CommandBus\Tests\Command\ChangeYetAnotherThing;
use Drift\CommandBus\Tests\Command\NotRecoverableCommand;
use Drift\CommandBus\Tests\Command\RejectException;
use Drift\CommandBus\Tests\Command\ThrowException;
use Drift\CommandBus\Tests\CommandHandler\ChangeAnotherThingHandler;
use Drift\CommandBus\Tests\CommandHandler\ChangeAThingHandler;
use Drift\CommandBus\Tests\CommandHandler\ChangeYetAnotherThingHandler;
use Drift\CommandBus\Tests\CommandHandler\NotRecoverableCommandHandler;
use Drift\CommandBus\Tests\CommandHandler\RejectExceptionHandler;
use Drift\CommandBus\Tests\CommandHandler\ThrowExceptionHandler;
use Drift\CommandBus\Tests\Context;
use Drift\CommandBus\Tests\Middleware\Middleware1;

Expand Down Expand Up @@ -62,6 +68,24 @@ protected static function decorateConfiguration(array $configuration): array
],
];

$configuration['services'][ThrowExceptionHandler::class] = [
'tags' => [
['name' => 'command_handler', 'method' => 'handle'],
],
];

$configuration['services'][RejectExceptionHandler::class] = [
'tags' => [
['name' => 'command_handler', 'method' => 'handle'],
],
];

$configuration['services'][NotRecoverableCommandHandler::class] = [
'tags' => [
['name' => 'command_handler', 'method' => 'handle'],
],
];

$configuration['imports'] = [
['resource' => __DIR__.'/../autowiring.yml'],
];
Expand Down Expand Up @@ -192,14 +216,14 @@ public function testAsyncCommands()
'command-bus:consume-commands',
]);

usleep(200000);
usleep(500000);

$promise1 = $this
->getCommandBus()
->execute(new ChangeAThing('thing'));

await($promise1, $this->getLoop());
usleep(200000);
usleep(500000);

$promises = [];
$promises[] = $this
Expand All @@ -211,14 +235,78 @@ public function testAsyncCommands()
->execute(new ChangeYetAnotherThing('thing'));

awaitAll($promises, $this->getLoop());
usleep(200000);
usleep(500000);
$output = $process->getOutput();
$this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeAThing", $output);
$this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeAnotherThing", $output);
$this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeYetAnotherThing", $output);
$process->stop();
}

public function testThrowException()
{
$this->resetInfrastructure();

$process = $this->runAsyncCommand([
'command-bus:consume-commands',
]);

usleep(500000);

$promise = $this
->getCommandBus()
->execute(new ThrowException());

await($promise, $this->getLoop());
usleep(500000);
$output = $process->getOutput();
$this->assertTrue(substr_count($output, 'Rejected') > 1);
$process->stop();
}

public function testReturnRejectedPromise()
{
$this->resetInfrastructure();

$process = $this->runAsyncCommand([
'command-bus:consume-commands',
]);

usleep(500000);

$promise = $this
->getCommandBus()
->execute(new RejectException());

await($promise, $this->getLoop());
usleep(500000);
$output = $process->getOutput();
$this->assertTrue(substr_count($output, 'Rejected') > 1);
$process->stop();
}

public function testNoRecoverableCommand()
{
$this->resetInfrastructure();

$process = $this->runAsyncCommand([
'command-bus:consume-commands',
]);

usleep(500000);

$promise = $this
->getCommandBus()
->execute(new NotRecoverableCommand());

await($promise, $this->getLoop());
usleep(500000);
$output = $process->getOutput();
$this->assertTrue(1 === substr_count($output, 'Rejected'));
$this->assertStringContainsString("\033[01;31mRejected\033[0m NotRecoverableCommand", $output);
$process->stop();
}

/**
* Reset infrastructure.
*
Expand Down Expand Up @@ -274,7 +362,10 @@ protected function consumeCommands(int $limit): string
usleep(100000);
}

return $process->getOutput();
$output = $process->getOutput();
$process->stop();

return $output;
}

/**
Expand Down
Loading

0 comments on commit f8c6281

Please sign in to comment.