Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added prefetch options in consumer command #34

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Async/AMQPAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,22 @@ public function enqueue($command): PromiseInterface
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
* @param Prefetch $prefetch
*
* @throws InvalidCommandException
*/
public function consume(
CommandBus $bus,
int $limit,
OutputPrinter $outputPrinter
OutputPrinter $outputPrinter,
Prefetch $prefetch
) {
$this->resetIterations($limit);
$forced = false;

$this
->channel
->qos(0, 1, true)
->qos($prefetch->getPrefetchSize(), $prefetch->getPrefetchCount(), $prefetch->isGlobal())
->then(function () use ($bus, $outputPrinter, &$forced) {
return $this
->channel
Expand Down
4 changes: 3 additions & 1 deletion Async/AsyncAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,15 @@ abstract public function enqueue($command): PromiseInterface;
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
* @param Prefetch $prefetch
*
* @throws InvalidCommandException
*/
abstract public function consume(
CommandBus $bus,
int $limit,
OutputPrinter $outputPrinter
OutputPrinter $outputPrinter,
Prefetch $prefetch
);

/**
Expand Down
4 changes: 3 additions & 1 deletion Async/InMemoryAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,15 @@ public function enqueue($command): PromiseInterface
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
* @param Prefetch $prefetch
*
* @throws InvalidCommandException
*/
public function consume(
CommandBus $bus,
int $limit,
OutputPrinter $outputPrinter
OutputPrinter $outputPrinter,
Prefetch $prefetch
) {
$this->resetIterations($limit);

Expand Down
4 changes: 3 additions & 1 deletion Async/PostgreSQLAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,15 @@ public function enqueue($command): PromiseInterface
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
* @param Prefetch $prefetch
*
* @throws InvalidCommandException
*/
public function consume(
CommandBus $bus,
int $limit,
OutputPrinter $outputPrinter
OutputPrinter $outputPrinter,
Prefetch $prefetch
) {
$this->resetIterations($limit);
$keepChecking = $this->consumeAvailableElements($bus, $outputPrinter);
Expand Down
65 changes: 65 additions & 0 deletions Async/Prefetch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

/*
* This file is part of the DriftPHP Project
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Feel free to edit as you please, and have fun.
*
* @author Marc Morera <yuhu@mmoreram.com>
*/

declare(strict_types=1);

namespace Drift\CommandBus\Async;

/**
* Class Prefetch.
*/
class Prefetch
{
private int $prefetchSize;
private int $prefetchCount;
private bool $global;

/**
* @param int $prefetchSize
* @param int $prefetchCount
* @param bool $global
*/
public function __construct(
int $prefetchSize,
int $prefetchCount,
bool $global
) {
$this->prefetchSize = $prefetchSize;
$this->prefetchCount = $prefetchCount;
$this->global = $global;
}

/**
* @return int
*/
public function getPrefetchSize(): int
{
return $this->prefetchSize;
}

/**
* @return int
*/
public function getPrefetchCount(): int
{
return $this->prefetchCount;
}

/**
* @return bool
*/
public function isGlobal(): bool
{
return $this->global;
}
}
4 changes: 3 additions & 1 deletion Async/RedisAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ public function enqueue($command): PromiseInterface
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
* @param Prefetch $prefetch
*
* @throws InvalidCommandException
*/
public function consume(
CommandBus $bus,
int $limit,
OutputPrinter $outputPrinter
OutputPrinter $outputPrinter,
Prefetch $prefetch
) {
$this->resetIterations($limit);

Expand Down
14 changes: 13 additions & 1 deletion Console/CommandConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
namespace Drift\CommandBus\Console;

use Drift\CommandBus\Async\AsyncAdapter;
use Drift\CommandBus\Async\Prefetch;
use Drift\CommandBus\Bus\InlineCommandBus;
use Drift\Console\OutputPrinter;
use Drift\EventBus\Bus\EventBus;
Expand Down Expand Up @@ -78,6 +79,10 @@ protected function configure()
'Exchanges to listen'
);
}

$this->addOption('prefetch-size', null, InputOption::VALUE_OPTIONAL, 'Prefetch size. Only for AMQP', 0);
$this->addOption('prefetch-count', null, InputOption::VALUE_OPTIONAL, 'Prefetch count. Only for AMQP', 1);
$this->addOption('is-prefetch-local', null, InputOption::VALUE_NONE, 'Prefetch is global. Only for AMQP');
}

/**
Expand Down Expand Up @@ -113,12 +118,19 @@ class_exists(EventBusSubscriber::class) &&
);
}

$prefetch = new Prefetch(
\intval($input->getOption('prefetch-size')),
\intval($input->getOption('prefetch-count')),
!\boolval($input->getOption('is-prefetch-local')),
);

$this
->asyncAdapter
->consume(
$this->commandBus,
\intval($input->getOption('limit')),
$outputPrinter
$outputPrinter,
$prefetch
);

return 0;
Expand Down