Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some changes to command bus #31

Merged
merged 1 commit into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/vendor
/var
/composer.lock
/.php_cs.cache
/.php_cs.cache
/.phpunit.result.cache
1 change: 0 additions & 1 deletion .phpunit.result.cache

This file was deleted.

13 changes: 9 additions & 4 deletions Async/AMQPAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Bunny\Exception\ClientException;
use Bunny\Message;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusHeaderMessage;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
Expand Down Expand Up @@ -179,16 +180,20 @@ public function consume(
return $this
->channel
->consume(function (Message $message, Channel $channel) use ($bus, $outputPrinter, &$forced) {
$command = unserialize($message->content);

return $this
->executeCommand(
$bus,
unserialize($message->content),
$command,
$outputPrinter,
function () use ($message, $channel) {
function () use ($channel, $message) {
return $channel->ack($message);
},
function () use ($message, $channel) {
return $channel->nack($message);
function () use ($command, $channel, $message) {
return $command instanceof NonRecoverableCommand
? $channel->ack($message)
: $channel->nack($message);
},
function () use (&$forced) {
$forced = true;
Expand Down
12 changes: 10 additions & 2 deletions Async/InMemoryAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use function Clue\React\Block\await;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
use Drift\Console\OutputPrinter;
Expand Down Expand Up @@ -140,13 +141,20 @@ public function consume(
$this->resetIterations($limit);

foreach ($this->queue as $key => $command) {
$notShouldRecover = !$command instanceof NonRecoverableCommand;
if ($notShouldRecover) {
unset($this->queue[$key]);
}

$promise = $this
->executeCommand(
$bus,
$command,
$outputPrinter,
function () use ($key) {
unset($this->queue[$key]);
function () use ($key, $notShouldRecover) {
if (!$notShouldRecover) {
unset($this->queue[$key]);
}
},
function () {},
function () {
Expand Down
22 changes: 18 additions & 4 deletions Async/PostgreSQLAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use function Clue\React\Block\await;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusHeaderMessage;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
Expand Down Expand Up @@ -233,12 +234,19 @@ public function consume(
return true;
}

$command = unserialize(stripslashes($message['payload']));

return $this->executeCommand(
$bus,
unserialize(stripslashes($message['payload'])),
$command,
$outputPrinter,
function () {},
function () {},
function () use ($command) {
$shouldRecover = !$command instanceof NonRecoverableCommand;
if ($shouldRecover) {
return $this->enqueue($command);
}
},
function () use (&$forced) {
$this
->loop
Expand Down Expand Up @@ -279,13 +287,19 @@ private function consumeAvailableElements(
return true;
}

$command = unserialize(stripslashes($message['payload']));

return $this->executeCommand(
$bus,
unserialize(stripslashes($message['payload'])),
$command,
$outputPrinter,
function () {
},
function () {
function () use ($command) {
$shouldRecover = !$command instanceof NonRecoverableCommand;
if ($shouldRecover) {
return $this->enqueue($command);
}
},
function () {
return false;
Expand Down
12 changes: 10 additions & 2 deletions Async/RedisAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use function Clue\React\Block\await;
use Clue\React\Redis\Client;
use Drift\CommandBus\Bus\CommandBus;
use Drift\CommandBus\Bus\NonRecoverableCommand;
use Drift\CommandBus\Console\CommandBusLineMessage;
use Drift\CommandBus\Exception\InvalidCommandException;
use Drift\Console\OutputPrinter;
Expand Down Expand Up @@ -148,12 +149,19 @@ public function consume(
->redis
->blPop($this->key, 0)
->then(function (array $job) use ($bus, $outputPrinter) {
$command = unserialize($job[1]);

return $this->executeCommand(
$bus,
unserialize($job[1]),
$command,
$outputPrinter,
function () {},
function () {},
function () use ($command) {
$shouldRecover = !$command instanceof NonRecoverableCommand;
if ($shouldRecover) {
return $this->enqueue($command);
}
},
function () {
return true;
}
Expand Down
15 changes: 10 additions & 5 deletions Bus/CommandBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use Drift\CommandBus\Exception\InvalidCommandException;
use React\Promise\PromiseInterface;
use function React\Promise\reject;

/**
* Class CommandBus.
Expand All @@ -34,10 +35,14 @@ class CommandBus extends Bus
*/
public function execute($command): PromiseInterface
{
return $this
->handle($command)
->then(function () {
return;
});
try {
return $this
->handle($command)
->then(function () {
return;
});
} catch (\Exception $exception) {
return reject($exception);
}
}
}
20 changes: 20 additions & 0 deletions Bus/NonRecoverableCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

/*
* This file is part of the DriftPHP Project
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Feel free to edit as you please, and have fun.
*
* @author Marc Morera <yuhu@mmoreram.com>
*/

declare(strict_types=1);

namespace Drift\CommandBus\Bus;

interface NonRecoverableCommand
{
}
7 changes: 6 additions & 1 deletion Bus/QueryBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use Drift\CommandBus\Exception\InvalidCommandException;
use React\Promise\PromiseInterface;
use function React\Promise\reject;

/**
* Class QueryBus.
Expand All @@ -34,6 +35,10 @@ class QueryBus extends Bus
*/
public function ask($query): PromiseInterface
{
return $this->handle($query);
try {
return $this->handle($query);
} catch (\Exception $exception) {
return reject($exception);
}
}
}
11 changes: 2 additions & 9 deletions Console/CommandConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,8 @@
*/
class CommandConsumerCommand extends Command
{
/**
* @var AsyncAdapter
*/
private $asyncAdapter;

/**
* @var InlineCommandBus
*/
private $commandBus;
private AsyncAdapter $asyncAdapter;
private InlineCommandBus $commandBus;

/**
* ConsumeCommand constructor.
Expand Down
99 changes: 95 additions & 4 deletions Tests/Async/AsyncAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@
use Drift\CommandBus\Tests\Command\ChangeAThing;
use Drift\CommandBus\Tests\Command\ChangeBThing;
use Drift\CommandBus\Tests\Command\ChangeYetAnotherThing;
use Drift\CommandBus\Tests\Command\NotRecoverableCommand;
use Drift\CommandBus\Tests\Command\RejectException;
use Drift\CommandBus\Tests\Command\ThrowException;
use Drift\CommandBus\Tests\CommandHandler\ChangeAnotherThingHandler;
use Drift\CommandBus\Tests\CommandHandler\ChangeAThingHandler;
use Drift\CommandBus\Tests\CommandHandler\ChangeYetAnotherThingHandler;
use Drift\CommandBus\Tests\CommandHandler\NotRecoverableCommandHandler;
use Drift\CommandBus\Tests\CommandHandler\RejectExceptionHandler;
use Drift\CommandBus\Tests\CommandHandler\ThrowExceptionHandler;
use Drift\CommandBus\Tests\Context;
use Drift\CommandBus\Tests\Middleware\Middleware1;

Expand Down Expand Up @@ -62,6 +68,24 @@ protected static function decorateConfiguration(array $configuration): array
],
];

$configuration['services'][ThrowExceptionHandler::class] = [
'tags' => [
['name' => 'command_handler', 'method' => 'handle'],
],
];

$configuration['services'][RejectExceptionHandler::class] = [
'tags' => [
['name' => 'command_handler', 'method' => 'handle'],
],
];

$configuration['services'][NotRecoverableCommandHandler::class] = [
'tags' => [
['name' => 'command_handler', 'method' => 'handle'],
],
];

$configuration['imports'] = [
['resource' => __DIR__.'/../autowiring.yml'],
];
Expand Down Expand Up @@ -192,14 +216,14 @@ public function testAsyncCommands()
'command-bus:consume-commands',
]);

usleep(200000);
usleep(500000);

$promise1 = $this
->getCommandBus()
->execute(new ChangeAThing('thing'));

await($promise1, $this->getLoop());
usleep(200000);
usleep(500000);

$promises = [];
$promises[] = $this
Expand All @@ -211,14 +235,78 @@ public function testAsyncCommands()
->execute(new ChangeYetAnotherThing('thing'));

awaitAll($promises, $this->getLoop());
usleep(200000);
usleep(500000);
$output = $process->getOutput();
$this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeAThing", $output);
$this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeAnotherThing", $output);
$this->assertStringContainsString("\033[01;32mConsumed\033[0m ChangeYetAnotherThing", $output);
$process->stop();
}

public function testThrowException()
{
$this->resetInfrastructure();

$process = $this->runAsyncCommand([
'command-bus:consume-commands',
]);

usleep(500000);

$promise = $this
->getCommandBus()
->execute(new ThrowException());

await($promise, $this->getLoop());
usleep(500000);
$output = $process->getOutput();
$this->assertTrue(substr_count($output, 'Rejected') > 1);
$process->stop();
}

public function testReturnRejectedPromise()
{
$this->resetInfrastructure();

$process = $this->runAsyncCommand([
'command-bus:consume-commands',
]);

usleep(500000);

$promise = $this
->getCommandBus()
->execute(new RejectException());

await($promise, $this->getLoop());
usleep(500000);
$output = $process->getOutput();
$this->assertTrue(substr_count($output, 'Rejected') > 1);
$process->stop();
}

public function testNoRecoverableCommand()
{
$this->resetInfrastructure();

$process = $this->runAsyncCommand([
'command-bus:consume-commands',
]);

usleep(500000);

$promise = $this
->getCommandBus()
->execute(new NotRecoverableCommand());

await($promise, $this->getLoop());
usleep(500000);
$output = $process->getOutput();
$this->assertTrue(1 === substr_count($output, 'Rejected'));
$this->assertStringContainsString("\033[01;31mRejected\033[0m NotRecoverableCommand", $output);
$process->stop();
}

/**
* Reset infrastructure.
*
Expand Down Expand Up @@ -274,7 +362,10 @@ protected function consumeCommands(int $limit): string
usleep(100000);
}

return $process->getOutput();
$output = $process->getOutput();
$process->stop();

return $output;
}

/**
Expand Down
Loading