diff --git a/.editorconfig b/.editorconfig index fa2adc8..131e7a2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -4,5 +4,5 @@ root = true end_of_line = lf insert_final_newline = true trim_trailing_whitespace = true -indent_style = spaces +indent_style = space charset = utf-8 diff --git a/.github/workflows/Tests.yaml b/.github/workflows/Tests.yaml new file mode 100644 index 0000000..28dcb44 --- /dev/null +++ b/.github/workflows/Tests.yaml @@ -0,0 +1,59 @@ +name: Tests + +on: + push: + pull_request: + workflow_dispatch: + +jobs: + Test: + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + php: + - 8.1 + dependencies: + - hi + - lo + + steps: + - uses: actions/checkout@v3 + + - name: Install Beanstalk + run: sudo apt-get install --yes beanstalkd + + - name: Setup PHP ${{ matrix.php }} + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: xdebug + + - name: Validate composer.json + run: composer validate + + - name: Cache dependencies + id: composer-cache + uses: actions/cache@v3 + with: + path: vendor + key: php-${{ matrix.php }}-${{ matrix.dependencies }}-${{ hashFiles('composer.json') }} + restore-keys: php-${{ matrix.php }}-${{ matrix.dependencies }}- + + - name: Install dependencies ${{ matrix.dependencies == 'lo' && '(lowest)' || '' }} + run: composer update --no-interaction --no-progress + ${{ matrix.dependencies == 'lo' && '--prefer-lowest' || '' }} + + - name: Run test suite with coverage + run: composer test -- --coverage-clover=build/logs/clover.xml + env: + AMP_TEST_BEANSTALK_INTEGRATION: 1 + + - name: Check code style + run: composer cs + + - name: Upload test coverage + run: composer global require php-coveralls/php-coveralls && php-coveralls -v + env: + COVERALLS_REPO_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore index 5293e46..cf1c6d6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ +/.*/ +!/.github/ /coverage/ /composer.lock /vendor/ /protocol.txt -/.php_cs.cache +/.php-cs-fixer.cache +.phpunit.result.cache diff --git a/.php_cs.dist b/.php-cs-fixer.dist.php similarity index 86% rename from .php_cs.dist rename to .php-cs-fixer.dist.php index 106b620..e980f09 100644 --- a/.php_cs.dist +++ b/.php-cs-fixer.dist.php @@ -1,19 +1,18 @@ setRiskyAllowed(true) ->setRules([ "@PSR1" => true, "@PSR2" => true, "braces" => [ "allow_single_line_closure" => true, - "position_after_functions_and_oop_constructs" => "same", ], "array_syntax" => ["syntax" => "short"], "cast_spaces" => true, "combine_consecutive_unsets" => true, "function_to_constant" => true, - "no_multiline_whitespace_before_semicolons" => true, + "multiline_whitespace_before_semicolons" => true, "no_unused_imports" => true, "no_useless_else" => true, "no_useless_return" => true, @@ -27,7 +26,7 @@ "php_unit_fqcn_annotation" => true, "phpdoc_summary" => true, "phpdoc_types" => true, - "psr4" => true, + "psr_autoloading" => true, "return_type_declaration" => ["space_before" => "none"], "short_scalar_cast" => true, "single_blank_line_before_namespace" => true, diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 02ae037..0000000 --- a/.travis.yml +++ /dev/null @@ -1,33 +0,0 @@ -sudo: required - -language: php - -php: - - 7.1 - - 7.2 - - 7.3 - - 7.4 - - nightly - -matrix: - allow_failures: - - php: nightly - fast_finish: true - -install: - - sudo apt-get install -qq --force-yes beanstalkd - - beanstalkd >/dev/null 2>&1 & - - composer update -n --prefer-dist - - wget https://github.com/php-coveralls/php-coveralls/releases/download/v1.0.0/coveralls.phar - - chmod +x coveralls.phar - -script: - - phpdbg -qrr vendor/bin/phpunit --coverage-text --coverage-clover build/logs/clover.xml - - PHP_CS_FIXER_IGNORE_ENV=1 php vendor/bin/php-cs-fixer --diff --dry-run -v fix - -after_script: - - ./coveralls.phar -v - -cache: - directories: - - $HOME/.composer/cache/files diff --git a/composer.json b/composer.json index 3f062c9..8dd1abb 100644 --- a/composer.json +++ b/composer.json @@ -13,20 +13,25 @@ "Amp\\Beanstalk\\": "src" } }, + "minimum-stability": "beta", + "prefer-stable": true, "require": { - "amphp/amp": "^2", - "amphp/socket": "^1.0", - "amphp/uri": "^0.1", + "php": "^8.1", + "amphp/amp": "^v3.0.0-beta.9", + "amphp/socket": "^v2.0.0-beta.6", + "amphp/uri": "^0.1.4", "symfony/yaml": "^3.3|^4|^5" }, "require-dev": { - "amphp/phpunit-util": "^1", - "phpunit/phpunit": "^6", - "friendsofphp/php-cs-fixer": "^2.3" + "amphp/phpunit-util": "^v3.0.0-beta.3", + "friendsofphp/php-cs-fixer": "^3.12", + "phpunit/phpunit": "^9.5.23" + }, + "scripts": { + "test": "phpunit", + "cs": "php-cs-fixer fix --dry-run --diff --verbose" }, "config": { - "platform": { - "php": "7.1.0" - } + "sort-packages": true } } diff --git a/examples/consumer.php b/examples/consumer.php index 2c189dc..41d0198 100644 --- a/examples/consumer.php +++ b/examples/consumer.php @@ -3,16 +3,13 @@ require __DIR__ . '/../vendor/autoload.php'; use Amp\Beanstalk\BeanstalkClient; -use Amp\Loop; -Loop::run(function () { - $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - yield $beanstalk->watch('foobar'); +$beanstalk = new BeanstalkClient('tcp://127.0.0.1:11300'); +$beanstalk->watch('foobar'); - while (list($jobId, $payload) = yield $beanstalk->reserve()) { - echo "Job id: $jobId\n"; - echo "Payload: $payload\n"; +while (list($jobId, $payload) = $beanstalk->reserve()) { + echo "Job id: $jobId\n"; + echo "Payload: $payload\n"; - $beanstalk->delete($jobId); - } -}); + $beanstalk->delete($jobId); +} diff --git a/examples/producer.php b/examples/producer.php index 9f1104b..1ba56b9 100644 --- a/examples/producer.php +++ b/examples/producer.php @@ -3,21 +3,18 @@ require __DIR__ . '/../vendor/autoload.php'; use Amp\Beanstalk\BeanstalkClient; -use Amp\Loop; -Loop::run(function () { - $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - yield $beanstalk->use('foobar'); +$beanstalk = new BeanstalkClient('tcp://127.0.0.1:11300'); +$beanstalk->use('foobar'); - $payload = json_encode([ - "job" => bin2hex(random_bytes(16)), - "type" => "compress-image", - "path" => "/path/to/image.png" - ]); +$payload = json_encode([ + 'job' => bin2hex(random_bytes(16)), + 'type' => 'compress-image', + 'path' => '/path/to/image.png' +]); - $jobId = yield $beanstalk->put($payload); +$jobId = $beanstalk->put($payload); - echo "Inserted job id: $jobId\n"; +echo "Inserted job id: $jobId\n"; - $beanstalk->quit(); -}); +$beanstalk->quit(); diff --git a/examples/stats.php b/examples/stats.php index e6d46d6..0f7b3f8 100644 --- a/examples/stats.php +++ b/examples/stats.php @@ -4,17 +4,14 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\Stats\System; -use Amp\Loop; -Loop::run(function () { - $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); +$beanstalk = new BeanstalkClient('tcp://127.0.0.1:11300'); - /** - * @var System $systemStats - */ - $systemStats = yield $beanstalk->getSystemStats(); - echo "Active connections: {$systemStats->currentConnections}\n"; - echo "Jobs ready: {$systemStats->currentJobsReady}\n"; +/** + * @var System $systemStats + */ +$systemStats = $beanstalk->getSystemStats(); +echo "Active connections: $systemStats->currentConnections\n"; +echo "Jobs ready: $systemStats->currentJobsReady\n"; - $beanstalk->quit(); -}); +$beanstalk->quit(); diff --git a/phpunit.xml.dist b/phpunit.xml.dist index a11edbb..5561cd2 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,6 +1,6 @@ - + ./test diff --git a/src/BadFormatException.php b/src/BadFormatException.php index 282cd7f..187a1d9 100644 --- a/src/BadFormatException.php +++ b/src/BadFormatException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class BadFormatException extends BeanstalkException { +class BadFormatException extends BeanstalkException +{ } diff --git a/src/BeanstalkClient.php b/src/BeanstalkClient.php index e786173..2e0dc36 100644 --- a/src/BeanstalkClient.php +++ b/src/BeanstalkClient.php @@ -5,37 +5,34 @@ use Amp\Beanstalk\Stats\Job; use Amp\Beanstalk\Stats\System; use Amp\Beanstalk\Stats\Tube; -use function Amp\call; -use Amp\Deferred; -use Amp\Promise; +use Amp\DeferredFuture; use Amp\Uri\Uri; use Symfony\Component\Yaml\Yaml; use Throwable; -class BeanstalkClient { - /** @var Deferred[] */ - private $deferreds; +class BeanstalkClient +{ + /** @var DeferredFuture[] */ + private array $deferreds; - /** @var Connection */ - private $connection; + private Connection $connection; - /** @var string */ - private $tube; + private ?string $tube; - public function __construct(string $uri) { + public function __construct(string $uri) + { $this->applyUri($uri); $this->deferreds = []; $this->connection = new Connection($uri); $this->connection->addEventHandler("response", function ($response) { - /** @var Deferred $deferred */ $deferred = array_shift($this->deferreds); if ($response instanceof Throwable) { - $deferred->fail($response); + $deferred->error($response); } else { - $deferred->resolve($response); + $deferred->complete($response); } }); @@ -50,37 +47,39 @@ public function __construct(string $uri) { if ($this->tube) { $this->connection->addEventHandler("connect", function () { - array_unshift($this->deferreds, new Deferred); + array_unshift($this->deferreds, new DeferredFuture()); return "use $this->tube\r\n"; }); } } - private function applyUri(string $uri) { + private function applyUri(string $uri): void + { $this->tube = (new Uri($uri))->getQueryParameter("tube"); } - private function send(string $message, callable $transform = null): Promise { - return call(function () use ($message, $transform) { - $this->deferreds[] = $deferred = new Deferred; - $promise = $deferred->promise(); + private function send(string $message, callable $transform = null) + { + $this->deferreds[] = $deferred = new DeferredFuture(); + $future = $deferred->getFuture(); - yield $this->connection->send($message); - $response = yield $promise; + $this->connection->send($message); + $response = $future->await(); - return $transform ? $transform($response) : $response; - }); + return $transform ? $transform($response) : $response; } - public function use(string $tube) { + public function use(string $tube) + { return $this->send("use " . $tube . "\r\n", function () use ($tube) { $this->tube = $tube; return null; }); } - public function pause(string $tube, int $delay): Promise { + public function pause(string $tube, int $delay) + { $payload = "pause-tube $tube $delay\r\n"; return $this->send($payload, function (array $response) use ($tube) { @@ -99,7 +98,8 @@ public function pause(string $tube, int $delay): Promise { }); } - public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0): Promise { + public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0) + { $payload = "put $priority $delay $timeout " . strlen($payload) . "\r\n$payload\r\n"; return $this->send($payload, function (array $response): int { @@ -125,7 +125,8 @@ public function put(string $payload, int $timeout = 60, int $delay = 0, $priorit }); } - public function reserve(int $timeout = null): Promise { + public function reserve(int $timeout = null) + { $payload = $timeout === null ? "reserve\r\n" : "reserve-with-timeout $timeout\r\n"; return $this->send($payload, function (array $response): array { @@ -147,7 +148,8 @@ public function reserve(int $timeout = null): Promise { }); } - public function delete(int $id): Promise { + public function delete(int $id) + { $payload = "delete $id\r\n"; return $this->send($payload, function (array $response): int { @@ -166,7 +168,8 @@ public function delete(int $id): Promise { }); } - public function release(int $id, int $delay = 0, int $priority = 0): Promise { + public function release(int $id, int $delay = 0, int $priority = 0) + { $payload = "release $id $priority $delay\r\n"; return $this->send($payload, function (array $response): string { @@ -184,7 +187,8 @@ public function release(int $id, int $delay = 0, int $priority = 0): Promise { }); } - public function bury(int $id, int $priority = 0): Promise { + public function bury(int $id, int $priority = 0) + { $payload = "bury $id $priority\r\n"; return $this->send($payload, function (array $response): int { @@ -203,7 +207,8 @@ public function bury(int $id, int $priority = 0): Promise { }); } - public function kickJob(int $id): Promise { + public function kickJob(int $id) + { $payload = "kick-job $id\r\n"; return $this->send($payload, function (array $response): bool { @@ -222,7 +227,8 @@ public function kickJob(int $id): Promise { }); } - public function kick(int $count): Promise { + public function kick(int $count) + { $payload = "kick $count\r\n"; return $this->send($payload, function (array $response): int { @@ -238,7 +244,8 @@ public function kick(int $count): Promise { }); } - public function touch(int $id): Promise { + public function touch(int $id) + { $payload = "touch $id\r\n"; return $this->send($payload, function (array $response): int { @@ -257,7 +264,8 @@ public function touch(int $id): Promise { }); } - public function watch(string $tube): Promise { + public function watch(string $tube) + { $payload = "watch $tube\r\n"; return $this->send($payload, function (array $response): int { @@ -269,7 +277,8 @@ public function watch(string $tube): Promise { }); } - public function ignore(string $tube): Promise { + public function ignore(string $tube) + { $payload = "ignore $tube\r\n"; return $this->send($payload, function (array $response): int { @@ -288,11 +297,13 @@ public function ignore(string $tube): Promise { }); } - public function quit() { + public function quit(): void + { $this->send("quit\r\n"); } - public function getJobStats(int $id): Promise { + public function getJobStats(int $id) + { $payload = "stats-job $id\r\n"; return $this->send($payload, function (array $response) use ($id): Job { @@ -311,7 +322,8 @@ public function getJobStats(int $id): Promise { }); } - public function getTubeStats(string $tube): Promise { + public function getTubeStats(string $tube) + { $payload = "stats-tube $tube\r\n"; return $this->send($payload, function (array $response) use ($tube): Tube { @@ -330,7 +342,8 @@ public function getTubeStats(string $tube): Promise { }); } - public function getSystemStats(): Promise { + public function getSystemStats() + { $payload = "stats\r\n"; return $this->send($payload, function (array $response): System { @@ -342,7 +355,8 @@ public function getSystemStats(): Promise { }); } - public function listTubes(): Promise { + public function listTubes() + { $payload = "list-tubes\r\n"; return $this->send($payload, function (array $response): array { @@ -358,7 +372,8 @@ public function listTubes(): Promise { }); } - public function listWatchedTubes(): Promise { + public function listWatchedTubes() + { $payload = "list-tubes-watched\r\n"; return $this->send($payload, function (array $response): array { @@ -374,7 +389,8 @@ public function listWatchedTubes(): Promise { }); } - public function getUsedTube(): Promise { + public function getUsedTube() + { $payload = "list-tube-used\r\n"; return $this->send($payload, function (array $response): string { @@ -390,7 +406,8 @@ public function getUsedTube(): Promise { }); } - public function peek(int $id): Promise { + public function peek(int $id) + { $payload = "peek $id\r\n"; return $this->send($payload, function (array $response) use ($id): string { @@ -409,19 +426,23 @@ public function peek(int $id): Promise { }); } - public function peekReady(): Promise { + public function peekReady() + { return $this->peekInState('ready'); } - public function peekDelayed(): Promise { + public function peekDelayed() + { return $this->peekInState('delayed'); } - public function peekBuried(): Promise { + public function peekBuried() + { return $this->peekInState('buried'); } - private function peekInState(string $state): Promise { + private function peekInState(string $state) + { $payload = "peek-$state\r\n"; return $this->send( @@ -443,12 +464,13 @@ function (array $response) use ($state): string { ); } - private function failAllDeferreds(Throwable $error) { + private function failAllDeferreds(Throwable $error): void + { // Fail any outstanding promises while ($this->deferreds) { - /** @var Deferred $deferred */ + /** @var DeferredFuture $deferred */ $deferred = array_shift($this->deferreds); - $deferred->fail($error); + $deferred->error($error); } } } diff --git a/src/BeanstalkException.php b/src/BeanstalkException.php index ab316f0..6b3ff4c 100644 --- a/src/BeanstalkException.php +++ b/src/BeanstalkException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class BeanstalkException extends \Exception { +class BeanstalkException extends \Exception +{ } diff --git a/src/ConnectException.php b/src/ConnectException.php index afc4c71..13d3b7e 100644 --- a/src/ConnectException.php +++ b/src/ConnectException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class ConnectException extends BeanstalkException { +class ConnectException extends BeanstalkException +{ } diff --git a/src/Connection.php b/src/Connection.php index ee4655c..66ec3f6 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -2,35 +2,27 @@ namespace Amp\Beanstalk; -use function Amp\asyncCall; -use function Amp\call; -use Amp\Deferred; +use function Amp\async; +use Amp\DeferredFuture; +use Amp\Future; use function Amp\Socket\connect; use Amp\Socket\ConnectContext; use Amp\Socket\Socket; -use Amp\Success; use Amp\Uri\Uri; -class Connection { - /** @var Deferred */ - private $connectPromisor; - - /** @var Parser */ - private $parser; - - /** @var int */ - private $timeout = 5000; - - /** @var Socket */ - private $socket; - - /** @var string */ - private $uri; +class Connection +{ + private ?DeferredFuture $connectFuture = null; + private Parser $parser; + private int $timeout = 5000; + private ?Socket $socket = null; + private string $uri; /** @var callable[][] */ - private $handlers; + private array $handlers; - public function __construct(string $uri) { + public function __construct(string $uri) + { $this->applyUri($uri); $this->handlers = [ "connect" => [], @@ -50,14 +42,16 @@ public function __construct(string $uri) { }); } - private function applyUri($uri) { + private function applyUri($uri): void + { $uri = new Uri($uri); $this->timeout = (int) ($uri->getQueryParameter("timeout") ?? $this->timeout); $this->uri = $uri->getScheme() . "://" . $uri->getHost() . ":" . $uri->getPort(); } - public function addEventHandler($events, callable $callback) { + public function addEventHandler($events, callable $callback): void + { $events = (array) $events; foreach ($events as $event) { @@ -69,42 +63,27 @@ public function addEventHandler($events, callable $callback) { } } - public function send(string $payload) { - return call(function () use ($payload) { - yield $this->connect(); - yield $this->socket->write($payload); - }); + public function send(string $payload): void + { + $this->connect()->await(); + $this->socket->write($payload); } - private function connect() { + private function connect(): Future + { // If we're in the process of connecting already return that same promise - if ($this->connectPromisor) { - return $this->connectPromisor->promise(); + if ($this->connectFuture) { + return $this->connectFuture->getFuture(); } // If a read watcher exists we know we're already connected if ($this->socket) { - return new Success; + return Future::complete(); } - $this->connectPromisor = new Deferred; - $socketPromise = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); - - $socketPromise->onResolve(function ($error, $socket) { - $connectPromisor = $this->connectPromisor; - $this->connectPromisor = null; - - if ($error) { - $connectPromisor->fail(new ConnectException( - "Connection attempt failed", - $code = 0, - $error - )); - - return; - } - - $this->socket = $socket; + $this->connectFuture = $connectFuture = new DeferredFuture(); + $socketFuture = async(function () use ($connectFuture) { + $this->socket = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); foreach ($this->handlers["connect"] as $handler) { $pipelinedCommand = $handler(); @@ -114,21 +93,30 @@ private function connect() { } } - asyncCall(function () { - while (null !== $chunk = yield $this->socket->read()) { + async(function () { + while (null !== $chunk = $this->socket->read()) { $this->parser->send($chunk); } $this->close(); }); - $connectPromisor->resolve(); + $connectFuture->complete(); + }); + + $socketFuture->finally(function () { + $this->connectFuture = null; + }); + + $socketFuture->catch(function (\Throwable $error) use ($connectFuture) { + $connectFuture->error(new ConnectException('Connection attempt failed', 0, $error)); }); - return $this->connectPromisor->promise(); + return $this->connectFuture->getFuture(); } - private function onError(\Throwable $exception) { + private function onError(\Throwable $exception): void + { foreach ($this->handlers["error"] as $handler) { $handler($exception); } @@ -136,7 +124,8 @@ private function onError(\Throwable $exception) { $this->close(); } - public function close() { + public function close(): void + { $this->parser->reset(); if ($this->socket) { @@ -149,7 +138,8 @@ public function close() { } } - public function __destruct() { + public function __destruct() + { $this->close(); } } diff --git a/src/ConnectionClosedException.php b/src/ConnectionClosedException.php index ea15233..4372b1b 100644 --- a/src/ConnectionClosedException.php +++ b/src/ConnectionClosedException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class ConnectionClosedException extends BeanstalkException { +class ConnectionClosedException extends BeanstalkException +{ } diff --git a/src/DeadlineSoonException.php b/src/DeadlineSoonException.php index 06d89f8..c784164 100644 --- a/src/DeadlineSoonException.php +++ b/src/DeadlineSoonException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class DeadlineSoonException extends BeanstalkException { +class DeadlineSoonException extends BeanstalkException +{ } diff --git a/src/DrainingException.php b/src/DrainingException.php index e50fe70..b1561c5 100644 --- a/src/DrainingException.php +++ b/src/DrainingException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class DrainingException extends BeanstalkException { +class DrainingException extends BeanstalkException +{ } diff --git a/src/ExpectedCrlfException.php b/src/ExpectedCrlfException.php index 0e94258..b5133e6 100644 --- a/src/ExpectedCrlfException.php +++ b/src/ExpectedCrlfException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class ExpectedCrlfException extends BeanstalkException { +class ExpectedCrlfException extends BeanstalkException +{ } diff --git a/src/InternalErrorException.php b/src/InternalErrorException.php index ab73531..6fd9d9f 100644 --- a/src/InternalErrorException.php +++ b/src/InternalErrorException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class InternalErrorException extends BeanstalkException { +class InternalErrorException extends BeanstalkException +{ } diff --git a/src/JobTooBigException.php b/src/JobTooBigException.php index cfb24fe..25d7f60 100644 --- a/src/JobTooBigException.php +++ b/src/JobTooBigException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class JobTooBigException extends BeanstalkException { +class JobTooBigException extends BeanstalkException +{ } diff --git a/src/NotFoundException.php b/src/NotFoundException.php index 42dabea..73a26de 100644 --- a/src/NotFoundException.php +++ b/src/NotFoundException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class NotFoundException extends BeanstalkException { +class NotFoundException extends BeanstalkException +{ } diff --git a/src/NotIgnoredException.php b/src/NotIgnoredException.php index fb6ee69..fcdf855 100644 --- a/src/NotIgnoredException.php +++ b/src/NotIgnoredException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class NotIgnoredException extends BeanstalkException { +class NotIgnoredException extends BeanstalkException +{ } diff --git a/src/OutOfMemoryException.php b/src/OutOfMemoryException.php index f683459..d88882b 100644 --- a/src/OutOfMemoryException.php +++ b/src/OutOfMemoryException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class OutOfMemoryException extends BeanstalkException { +class OutOfMemoryException extends BeanstalkException +{ } diff --git a/src/Parser.php b/src/Parser.php index 2a3b467..0688bdc 100644 --- a/src/Parser.php +++ b/src/Parser.php @@ -2,7 +2,8 @@ namespace Amp\Beanstalk; -class Parser { +class Parser +{ const CRLF = "\r\n"; const ERROR_OUT_OF_MEMORY = "OUT_OF_MEMORY"; @@ -13,11 +14,13 @@ class Parser { private $responseCallback; private $buffer = ""; - public function __construct(callable $responseCallback) { + public function __construct(callable $responseCallback) + { $this->responseCallback = $responseCallback; } - public function send(string $bytes) { + public function send(string $bytes): void + { $this->buffer .= $bytes; do { @@ -88,7 +91,8 @@ public function send(string $bytes) { } while (isset($this->buffer[0])); } - public function reset() { + public function reset(): void + { $this->buffer = ""; } } diff --git a/src/Stats/Job.php b/src/Stats/Job.php index 541ac7c..30ffea9 100644 --- a/src/Stats/Job.php +++ b/src/Stats/Job.php @@ -2,17 +2,15 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; - -class Job { - use Struct; - +class Job +{ const STATE_READY = "ready"; const STATE_DELAYED = "delayed"; const STATE_RESERVED = "reserved"; const STATE_BURIED = "buried"; - public function __construct(array $struct) { + public function __construct(array $struct) + { $this->id = (int) $struct["id"]; $this->tube = $struct["tube"]; $this->state = $struct["state"]; diff --git a/src/Stats/System.php b/src/Stats/System.php index 49067b2..d134ecc 100644 --- a/src/Stats/System.php +++ b/src/Stats/System.php @@ -2,12 +2,10 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; - -class System { - use Struct; - - public function __construct(array $struct) { +class System +{ + public function __construct(array $struct) + { $this->currentJobsUrgent = (int) $struct["current-jobs-urgent"]; $this->currentJobsReady = (int) $struct["current-jobs-ready"]; $this->currentJobsReserved = (int) $struct["current-jobs-reserved"]; diff --git a/src/Stats/Tube.php b/src/Stats/Tube.php index f24b000..c92a056 100644 --- a/src/Stats/Tube.php +++ b/src/Stats/Tube.php @@ -2,12 +2,10 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; - -class Tube { - use Struct; - - public function __construct(array $struct) { +class Tube +{ + public function __construct(array $struct) + { $this->name = $struct["name"]; $this->currentJobsUrgent = (int) $struct["current-jobs-urgent"]; $this->currentJobsReady = (int) $struct["current-jobs-ready"]; diff --git a/src/TimedOutException.php b/src/TimedOutException.php index 147bc0f..b40d1cb 100644 --- a/src/TimedOutException.php +++ b/src/TimedOutException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class TimedOutException extends BeanstalkException { +class TimedOutException extends BeanstalkException +{ } diff --git a/src/UnknownCommandException.php b/src/UnknownCommandException.php index 9d5c72e..5eaf7df 100644 --- a/src/UnknownCommandException.php +++ b/src/UnknownCommandException.php @@ -2,5 +2,6 @@ namespace Amp\Beanstalk; -class UnknownCommandException extends BeanstalkException { +class UnknownCommandException extends BeanstalkException +{ } diff --git a/test/BeanstalkClientConnectionClosedTest.php b/test/BeanstalkClientConnectionClosedTest.php index 069468d..9134249 100644 --- a/test/BeanstalkClientConnectionClosedTest.php +++ b/test/BeanstalkClientConnectionClosedTest.php @@ -2,58 +2,61 @@ namespace Amp\Beanstalk\Test; +use function Amp\async; use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\ConnectionClosedException; -use function Amp\call; -use Amp\Delayed; +use function Amp\delay; +use Amp\Future; use Amp\PHPUnit\AsyncTestCase; -use function Amp\Promise\all; -use Amp\Socket\Server; -use Amp\Socket\SocketException; +use Amp\Socket\InternetAddress; +use Amp\Socket\ResourceSocketServerFactory; +use Amp\Socket\SocketServer; -class BeanstalkClientConnectionClosedTest extends AsyncTestCase { - /** @var Server */ - private $server; +class BeanstalkClientConnectionClosedTest extends AsyncTestCase +{ + private SocketServer $server; - /** - * @throws SocketException - */ - public function setUp() { + public function setUp(): void + { parent::setUp(); - $this->server = Server::listen("tcp://127.0.0.1:0"); + + $this->server = (new ResourceSocketServerFactory)->listen(new InternetAddress('127.0.0.1', 0)); } - public function tearDown() { + public function tearDown(): void + { parent::tearDown(); + $this->server->close(); } /** - * @dataProvider dataProviderReserve - * - * @param $reserveTimeout int|null Seconds - * @param $connectionCloseTimeout int Milliseconds - * @param $testFailTimeout int Milliseconds - * @return \Generator + * @dataProvider provideReserveTimeouts */ - public function testReserve($reserveTimeout, $connectionCloseTimeout, $testFailTimeout) { - $beanstalk = new BeanstalkClient("tcp://". $this->server->getAddress()); - $connectionClosePromise = call(function ($connectionCloseTimeout) { - yield new Delayed($connectionCloseTimeout); + public function testReserve(?int $reserveTimeout, float $connectionCloseTimeout, float $testFailTimeout): void + { + $beanstalk = new BeanstalkClient('tcp://' . $this->server->getAddress()); + + $connectionClosePromise = async(function () use ($connectionCloseTimeout) { + delay($connectionCloseTimeout); + $this->server->close(); - }, $connectionCloseTimeout); + }); + $this->setTimeout($testFailTimeout); $this->expectException(ConnectionClosedException::class); - yield all([ + + Future\awaitAll([ $beanstalk->reserve($reserveTimeout), $connectionClosePromise ]); } - public function dataProviderReserve(): array { + public function provideReserveTimeouts(): iterable + { return [ - "no timeout" => [null, 500, 600], - "one second timeout" => [1, 900, 1100], + 'no timeout' => [null, .5, .6], + 'one second timeout' => [1, .9, 1.1], ]; } } diff --git a/test/IntegrationTest.php b/test/IntegrationTest.php index 3d63f0b..dd9483c 100644 --- a/test/IntegrationTest.php +++ b/test/IntegrationTest.php @@ -5,126 +5,118 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\Stats\Job; use Amp\Beanstalk\Stats\System; -use function Amp\call; -use function Amp\Promise\wait; use PHPUnit\Framework\TestCase; -class IntegrationTest extends TestCase { - /** @var BeanstalkClient */ - private $beanstalk; +class IntegrationTest extends TestCase +{ + private BeanstalkClient $beanstalk; - public function setUp() { + public function setUp(): void + { if (!\getenv("AMP_TEST_BEANSTALK_INTEGRATION") && !\getenv("TRAVIS")) { $this->markTestSkipped("You need to set AMP_TEST_BEANSTALK_INTEGRATION=1 in order to run the integration tests."); } $this->beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - wait(call(function () { - /** @var System $stats */ - $stats = yield $this->beanstalk->getSystemStats(); - for ($jobId = 1; $jobId <= $stats->totalJobs; $jobId++) { - yield $this->beanstalk->delete($jobId); - } - })); + /** @var System $stats */ + $stats = $this->beanstalk->getSystemStats(); + for ($jobId = 1; $jobId <= $stats->totalJobs; $jobId++) { + $this->beanstalk->delete($jobId); + } } - public function testPut() { - wait(call(function () { - /** @var System $statsBefore */ - $statsBefore = yield $this->beanstalk->getSystemStats(); + public function testPut(): void + { + /** @var System $statsBefore */ + $statsBefore = $this->beanstalk->getSystemStats(); - $jobId = yield $this->beanstalk->put("hi"); - $this->assertInternalType("int", $jobId); + $jobId = $this->beanstalk->put("hi"); + $this->assertIsInt($jobId); - /** @var Job $jobStats */ - $jobStats = yield $this->beanstalk->getJobStats($jobId); + /** @var Job $jobStats */ + $jobStats = $this->beanstalk->getJobStats($jobId); - $this->assertSame($jobId, $jobStats->id); - $this->assertSame(0, $jobStats->priority); - $this->assertSame(0, $jobStats->delay); + $this->assertSame($jobId, $jobStats->id); + $this->assertSame(0, $jobStats->priority); + $this->assertSame(0, $jobStats->delay); - /** @var System $statsAfter */ - $statsAfter = yield $this->beanstalk->getSystemStats(); + /** @var System $statsAfter */ + $statsAfter = $this->beanstalk->getSystemStats(); - $this->assertSame($statsBefore->cmdPut + 1, $statsAfter->cmdPut); - })); + $this->assertSame($statsBefore->cmdPut + 1, $statsAfter->cmdPut); } - public function testPeek() { - wait(call(function () { - $jobId = yield $this->beanstalk->put('I am ready'); - $this->assertInternalType("int", $jobId); + public function testPeek(): void + { + $jobId = $this->beanstalk->put('I am ready'); + $this->assertIsInt($jobId); - $peekedJob = yield $this->beanstalk->peek($jobId); - $this->assertEquals('I am ready', $peekedJob); + $peekedJob = $this->beanstalk->peek($jobId); + $this->assertEquals('I am ready', $peekedJob); - $peekedJob = yield $this->beanstalk->peekReady(); - $this->assertEquals('I am ready', $peekedJob); + $peekedJob = $this->beanstalk->peekReady(); + $this->assertEquals('I am ready', $peekedJob); - list($jobId) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - $peekedJob = yield $this->beanstalk->peekBuried(); - $this->assertEquals('I am ready', $peekedJob); + list($jobId) = $this->beanstalk->reserve(); + $buried = $this->beanstalk->bury($jobId); + $this->assertEquals(1, $buried); + $peekedJob = $this->beanstalk->peekBuried(); + $this->assertEquals('I am ready', $peekedJob); - $jobId = yield $this->beanstalk->put('I am delayed', 60, 60); - $peekedJob = yield $this->beanstalk->peekDelayed(); - $this->assertEquals('I am delayed', $peekedJob); - })); + $jobId = $this->beanstalk->put('I am delayed', 60, 60); + $peekedJob = $this->beanstalk->peekDelayed(); + $this->assertEquals('I am delayed', $peekedJob); } - public function testKickJob() { - wait(call(function () { - $jobId = yield $this->beanstalk->put("hi"); - $this->assertInternalType("int", $jobId); + public function testKickJob(): void + { + $jobId = $this->beanstalk->put("hi"); + $this->assertIsInt($jobId); - $kicked = yield $this->beanstalk->kickJob($jobId); - $this->assertFalse($kicked); + $kicked = $this->beanstalk->kickJob($jobId); + $this->assertFalse($kicked); - list($jobId, ) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - /** @var Job $jobStats */ - $jobStats = yield $this->beanstalk->getJobStats($jobId); - $this->assertEquals('buried', $jobStats->state); + list($jobId, ) = $this->beanstalk->reserve(); + $buried = $this->beanstalk->bury($jobId); + $this->assertEquals(1, $buried); + /** @var Job $jobStats */ + $jobStats = $this->beanstalk->getJobStats($jobId); + $this->assertEquals('buried', $jobStats->state); - $kicked = yield $this->beanstalk->kickJob($jobId); - $this->assertTrue($kicked); - })); + $kicked = $this->beanstalk->kickJob($jobId); + $this->assertTrue($kicked); } - public function testKick() { - wait(call(function () { - for ($i = 0; $i < 10; $i++) { - yield $this->beanstalk->put("Job $i"); - } - for ($i = 0; $i < 8; $i++) { - list($jobId, ) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - } - - $kicked = yield $this->beanstalk->kick(4); - $this->assertEquals(4, $kicked); - - $kicked = yield $this->beanstalk->kick(10); - $this->assertEquals(4, $kicked); - - $kicked = yield $this->beanstalk->kick(1); - $this->assertEquals(0, $kicked); - })); + public function testKick(): void + { + for ($i = 0; $i < 10; $i++) { + $this->beanstalk->put("Job $i"); + } + for ($i = 0; $i < 8; $i++) { + list($jobId, ) = $this->beanstalk->reserve(); + $buried = $this->beanstalk->bury($jobId); + $this->assertEquals(1, $buried); + } + + $kicked = $this->beanstalk->kick(4); + $this->assertEquals(4, $kicked); + + $kicked = $this->beanstalk->kick(10); + $this->assertEquals(4, $kicked); + + $kicked = $this->beanstalk->kick(1); + $this->assertEquals(0, $kicked); } - public function testReservedJobShouldHaveTheSamePayloadAsThePutPayload() { - wait(call(function () { - $jobId = yield $this->beanstalk->put(str_repeat('*', 65535)); + public function testReservedJobShouldHaveTheSamePayloadAsThePutPayload(): void + { + $jobId = $this->beanstalk->put(str_repeat('*', 65535)); - yield $this->beanstalk->watch('default'); - list($reservedJobId, $reservedJobPayload) = yield $this->beanstalk->reserve(); + $this->beanstalk->watch('default'); + list($reservedJobId, $reservedJobPayload) = $this->beanstalk->reserve(); - $this->assertSame($jobId, $reservedJobId); - $this->assertSame(65535, strlen($reservedJobPayload)); - })); + $this->assertSame($jobId, $reservedJobId); + $this->assertSame(65535, strlen($reservedJobPayload)); } } diff --git a/test/ParserTest.php b/test/ParserTest.php index ebf5d24..638f8dd 100644 --- a/test/ParserTest.php +++ b/test/ParserTest.php @@ -5,30 +5,35 @@ use Amp\Beanstalk\Parser; use PHPUnit\Framework\TestCase; -class ParserTest extends TestCase { - protected $parserToTest; +class ParserTest extends TestCase +{ + protected Parser $parserToTest; - protected $parsedElements; + protected mixed $parsedElements = null; - public function setUp() { + public function setUp(): void + { $this->parserToTest = new Parser(function ($result) { $this->parsedElements = $result; }); } - public function testParsesPartialResponseCorrectly() { + public function testParsesPartialResponseCorrectly(): void + { $this->parserToTest->send("OK 5\r\nhello\r"); $this->assertNull($this->parsedElements); $this->parserToTest->send("\n"); $this->assertSame(["OK", "hello"], $this->parsedElements); } - public function testParsesFound() { + public function testParsesFound(): void + { $this->parserToTest->send("FOUND 5 5\r\nhello\r\n"); $this->assertSame(["FOUND", 5, 'hello'], $this->parsedElements); } - public function testParsesReserved() { + public function testParsesReserved(): void + { $this->parserToTest->send("RESERVED 2 30\r\n"); $this->assertNull($this->parsedElements); $this->parserToTest->reset(); @@ -36,7 +41,8 @@ public function testParsesReserved() { $this->assertSame(["RESERVED", 5, 'hello'], $this->parsedElements); } - public function testResetBuffer() { + public function testResetBuffer(): void + { $this->parserToTest->send("OK 7\r\nmorn"); $this->assertNull($this->parsedElements); $this->parserToTest->send("ing\r\n"); @@ -54,12 +60,14 @@ public function testResetBuffer() { /** * @dataProvider dataProviderTestExceptions */ - public function testParserExceptions($buffer, $exceptionExpected) { + public function testParserExceptions($buffer, $exceptionExpected): void + { $this->parserToTest->send($buffer); $this->assertInstanceOf($exceptionExpected, $this->parsedElements); } - public function dataProviderTestExceptions() { + public function dataProviderTestExceptions(): array + { return [ [ "OUT_OF_MEMORY\r\nhello\r",