Skip to content

Commit

Permalink
Merge pull request #152 from jeromegamez/generate-better-connections
Browse files Browse the repository at this point in the history
Generate better connections
  • Loading branch information
WyriHaximus authored May 24, 2024
2 parents 6aeaaec + 00ba835 commit 3a2923f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Install Dependencies
uses: ramsey/composer-install@v2
- name: Run PHPStan
run: ./vendor/bin/phpstan analyze src --level 0
run: ./vendor/bin/phpstan analyze src --level 5
test:
name: "Run Tests on PHP ${{ matrix.php }} against RabbitMQ ${{ matrix.rabbitmq }} (Composer: ${{ matrix.composer }}; TLS: ${{ matrix.ssl_test }})"
runs-on: ubuntu-latest
Expand Down
36 changes: 17 additions & 19 deletions spec/generate.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
<?php
namespace Bunny;

use Bunny\Exception\ClientException;
use Bunny\Protocol\AbstractFrame;
use Bunny\Protocol\ContentBodyFrame;
use Bunny\Protocol\ContentHeaderFrame;
use Bunny\Protocol\HeartbeatFrame;
use React\EventLoop\Loop;
use React\Promise\Deferred;
use function React\Async\await;

require_once __DIR__ . "/../vendor/autoload.php";

Expand Down Expand Up @@ -336,7 +329,7 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " });\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";
$connectionContent .= " public function disconnect(int \$code, string \$reason)\n";
$connectionContent .= " public function disconnect(int \$code, string \$reason): void\n";
$connectionContent .= " {\n";
$connectionContent .= " \$this->connectionClose(\$code, 0, 0, \$reason);\n";
$connectionContent .= " \$this->connection->close();\n";
Expand All @@ -353,7 +346,7 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " *\n";
$connectionContent .= " * @param AbstractFrame \$frame\n";
$connectionContent .= " */\n";
$connectionContent .= " private function onFrameReceived(AbstractFrame \$frame)\n";
$connectionContent .= " private function onFrameReceived(AbstractFrame \$frame): void\n";
$connectionContent .= " {\n";
$connectionContent .= " if (\$frame instanceof MethodConnectionCloseFrame) {\n";
$connectionContent .= " \$this->disconnect(Constants::STATUS_CONNECTION_FORCED, \"Connection closed by server: ({\$frame->replyCode}) \" . \$frame->replyText);\n";
Expand Down Expand Up @@ -735,8 +728,9 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " \$buffer = \$this->writeBuffer;\n";
if ($class->id === 60 && $method->id === 40) {
$connectionContent .= " \$ck = serialize([\$channel, \$headers, \$exchange, \$routingKey, \$mandatory, \$immediate]);\n";
$connectionContent .= " \$c = isset(\$this->cache[\$ck]) ? \$this->cache[\$ck] : null;\n";
$connectionContent .= " \$flags = 0; \$off0 = 0; \$len0 = 0; \$off1 = 0; \$len1 = 0; \$contentTypeLength = null; \$contentType = null; \$contentEncodingLength = null; \$contentEncoding = null; \$headersBuffer = null; \$deliveryMode = null; \$priority = null; \$correlationIdLength = null; \$correlationId = null; \$replyToLength = null; \$replyTo = null; \$expirationLength = null; \$expiration = null; \$messageIdLength = null; \$messageId = null; \$timestamp = null; \$typeLength = null; \$type = null; \$userIdLength = null; \$userId = null; \$appIdLength = null; \$appId = null; \$clusterIdLength = null; \$clusterId = null;\n";
$connectionContent .= " \$c = \$this->cache[\$ck] ?? null;\n";
$connectionContent .= " \$flags = \$off0 = \$len0 = \$off1 = \$len1 = 0;\n";
$connectionContent .= " \$contentTypeLength = \$contentType = \$contentEncodingLength = \$contentEncoding = \$headersBuffer = \$deliveryMode = \$priority = \$correlationIdLength = \$correlationId = \$replyToLength = \$replyTo = \$expirationLength = \$expiration = \$messageIdLength = \$messageId = \$timestamp = \$typeLength = \$type = \$userIdLength = \$userId = \$appIdLength = \$appId = \$clusterIdLength = \$clusterId = null;\n";
$connectionContent .= " if (\$c) { \$buffer->append(\$c[0]); }\n";
$connectionContent .= " else {\n";
$connectionContent .= " \$off0 = \$buffer->getLength();\n";
Expand Down Expand Up @@ -769,6 +763,7 @@ function amqpTypeToLength($type, $e)

// FIXME: respect max body size agreed upon connection.tune
$connectionContent .= " \$s = 14;\n";
$connectionContent .= "\n";


foreach ([
Expand All @@ -788,9 +783,8 @@ function amqpTypeToLength($type, $e)
] as $flag => $property
) {
list($propertyName, $staticSize, $dynamicSize) = $property;
$connectionContent .= " if (isset(\$headers['{$propertyName}'])) {\n";
$connectionContent .= " if (\$" . lcfirst(dashedToCamel($propertyName)) . " = \$headers['{$propertyName}'] ?? null) {\n";
$connectionContent .= " \$flags |= {$flag};\n";
$connectionContent .= " \$" . lcfirst(dashedToCamel($propertyName)) . " = \$headers['{$propertyName}'];\n";
if ($staticSize) {
$connectionContent .= " \$s += {$staticSize};\n";
}
Expand All @@ -799,6 +793,7 @@ function amqpTypeToLength($type, $e)
}
$connectionContent .= " unset(\$headers['{$propertyName}']);\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";
}

$connectionContent .= " if (!empty(\$headers)) {\n";
Expand Down Expand Up @@ -892,10 +887,13 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " \$deferred = new Deferred();\n";
$connectionContent .= " \$this->awaitList[] = [\n";
$connectionContent .= " 'filter' => function (Protocol\\AbstractFrame \$frame)" . ($class->id !== 10 ? " use (\$channel)" : "") . ": bool {\n";
$connectionContent .= " if (\$frame instanceof Protocol\\{$className}" . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n";
$connectionContent .= " return true;\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";

if ($class->id !== 10 || $method->id !== 50) {
$connectionContent .= " if (\$frame instanceof Protocol\\{$className}" . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n";
$connectionContent .= " return true;\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";
}

if ($class->id === 60 && $method->id === 71) {
$connectionContent .= " if (\$frame instanceof Protocol\\" . str_replace("GetOk", "GetEmpty", $className) . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n";
Expand Down Expand Up @@ -968,7 +966,7 @@ function amqpTypeToLength($type, $e)
$protocolWriterContent .= "}\n";
file_put_contents(__DIR__ . "/../src/Protocol/ProtocolWriterGenerated.php", $protocolWriterContent);

$connectionContent .= " public function startHeathbeatTimer(): void\n";
$connectionContent .= " public function startHeartbeatTimer(): void\n";
$connectionContent .= " {\n";
$connectionContent .= " \$this->heartbeatTimer = Loop::addTimer(\$this->options['heartbeat'], [\$this, 'onHeartbeat']);\n";
$connectionContent .= " \$this->connection->on('drain', [\$this, 'onHeartbeat']);\n";
Expand All @@ -977,7 +975,7 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " /**\n";
$connectionContent .= " * Callback when heartbeat timer timed out.\n";
$connectionContent .= " */\n";
$connectionContent .= " public function onHeartbeat()\n";
$connectionContent .= " public function onHeartbeat(): void\n";
$connectionContent .= " {\n";
$connectionContent .= " \$now = microtime(true);\n";
$connectionContent .= " \$nextHeartbeat = (\$this->lastWrite ?: \$now) + \$this->options['heartbeat'];\n";
Expand Down
2 changes: 1 addition & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public function connect(): self
}
$this->connection->connectionTuneOk($tune->channelMax, $tune->frameMax, (int)$this->options['heartbeat']);
$this->connection->connectionOpen($this->options['vhost']);
$this->connection->startHeathbeatTimer();
$this->connection->startHeartbeatTimer();

$this->state = ClientStateEnum::CONNECTED;
} catch (\Throwable $thrown) {
Expand Down
70 changes: 34 additions & 36 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public function __construct(
});
}

public function disconnect(int $code, string $reason)
public function disconnect(int $code, string $reason): void
{
$this->connectionClose($code, 0, 0, $reason);
$this->connection->close();
Expand All @@ -100,7 +100,7 @@ public function disconnect(int $code, string $reason)
*
* @param AbstractFrame $frame
*/
private function onFrameReceived(AbstractFrame $frame)
private function onFrameReceived(AbstractFrame $frame): void
{
if ($frame instanceof MethodConnectionCloseFrame) {
$this->disconnect(Constants::STATUS_CONNECTION_FORCED, "Connection closed by server: ({$frame->replyCode}) " . $frame->replyText);
Expand Down Expand Up @@ -363,10 +363,6 @@ public function awaitConnectionClose(): Protocol\MethodConnectionCloseFrame
$deferred = new Deferred();
$this->awaitList[] = [
'filter' => function (Protocol\AbstractFrame $frame): bool {
if ($frame instanceof Protocol\MethodConnectionCloseFrame) {
return true;
}

if ($frame instanceof Protocol\MethodConnectionCloseFrame) {
$this->connectionCloseOk();
throw new ClientException($frame->replyText, $frame->replyCode);
Expand Down Expand Up @@ -1258,8 +1254,9 @@ public function publish(int $channel, string $body, array $headers = [], string
{
$buffer = $this->writeBuffer;
$ck = serialize([$channel, $headers, $exchange, $routingKey, $mandatory, $immediate]);
$c = isset($this->cache[$ck]) ? $this->cache[$ck] : null;
$flags = 0; $off0 = 0; $len0 = 0; $off1 = 0; $len1 = 0; $contentTypeLength = null; $contentType = null; $contentEncodingLength = null; $contentEncoding = null; $headersBuffer = null; $deliveryMode = null; $priority = null; $correlationIdLength = null; $correlationId = null; $replyToLength = null; $replyTo = null; $expirationLength = null; $expiration = null; $messageIdLength = null; $messageId = null; $timestamp = null; $typeLength = null; $type = null; $userIdLength = null; $userId = null; $appIdLength = null; $appId = null; $clusterIdLength = null; $clusterId = null;
$c = $this->cache[$ck] ?? null;
$flags = $off0 = $len0 = $off1 = $len1 = 0;
$contentTypeLength = $contentType = $contentEncodingLength = $contentEncoding = $headersBuffer = $deliveryMode = $priority = $correlationIdLength = $correlationId = $replyToLength = $replyTo = $expirationLength = $expiration = $messageIdLength = $messageId = $timestamp = $typeLength = $type = $userIdLength = $userId = $appIdLength = $appId = $clusterIdLength = $clusterId = null;
if ($c) { $buffer->append($c[0]); }
else {
$off0 = $buffer->getLength();
Expand All @@ -1274,94 +1271,95 @@ public function publish(int $channel, string $body, array $headers = [], string
$this->writer->appendBits([$mandatory, $immediate], $buffer);
$buffer->appendUint8(206);
$s = 14;
if (isset($headers['content-type'])) {

if ($contentType = $headers['content-type'] ?? null) {
$flags |= 32768;
$contentType = $headers['content-type'];
$s += 1;
$s += $contentTypeLength = strlen($contentType);
unset($headers['content-type']);
}
if (isset($headers['content-encoding'])) {

if ($contentEncoding = $headers['content-encoding'] ?? null) {
$flags |= 16384;
$contentEncoding = $headers['content-encoding'];
$s += 1;
$s += $contentEncodingLength = strlen($contentEncoding);
unset($headers['content-encoding']);
}
if (isset($headers['delivery-mode'])) {

if ($deliveryMode = $headers['delivery-mode'] ?? null) {
$flags |= 4096;
$deliveryMode = $headers['delivery-mode'];
$s += 1;
unset($headers['delivery-mode']);
}
if (isset($headers['priority'])) {

if ($priority = $headers['priority'] ?? null) {
$flags |= 2048;
$priority = $headers['priority'];
$s += 1;
unset($headers['priority']);
}
if (isset($headers['correlation-id'])) {

if ($correlationId = $headers['correlation-id'] ?? null) {
$flags |= 1024;
$correlationId = $headers['correlation-id'];
$s += 1;
$s += $correlationIdLength = strlen($correlationId);
unset($headers['correlation-id']);
}
if (isset($headers['reply-to'])) {

if ($replyTo = $headers['reply-to'] ?? null) {
$flags |= 512;
$replyTo = $headers['reply-to'];
$s += 1;
$s += $replyToLength = strlen($replyTo);
unset($headers['reply-to']);
}
if (isset($headers['expiration'])) {

if ($expiration = $headers['expiration'] ?? null) {
$flags |= 256;
$expiration = $headers['expiration'];
$s += 1;
$s += $expirationLength = strlen($expiration);
unset($headers['expiration']);
}
if (isset($headers['message-id'])) {

if ($messageId = $headers['message-id'] ?? null) {
$flags |= 128;
$messageId = $headers['message-id'];
$s += 1;
$s += $messageIdLength = strlen($messageId);
unset($headers['message-id']);
}
if (isset($headers['timestamp'])) {

if ($timestamp = $headers['timestamp'] ?? null) {
$flags |= 64;
$timestamp = $headers['timestamp'];
$s += 8;
unset($headers['timestamp']);
}
if (isset($headers['type'])) {

if ($type = $headers['type'] ?? null) {
$flags |= 32;
$type = $headers['type'];
$s += 1;
$s += $typeLength = strlen($type);
unset($headers['type']);
}
if (isset($headers['user-id'])) {

if ($userId = $headers['user-id'] ?? null) {
$flags |= 16;
$userId = $headers['user-id'];
$s += 1;
$s += $userIdLength = strlen($userId);
unset($headers['user-id']);
}
if (isset($headers['app-id'])) {

if ($appId = $headers['app-id'] ?? null) {
$flags |= 8;
$appId = $headers['app-id'];
$s += 1;
$s += $appIdLength = strlen($appId);
unset($headers['app-id']);
}
if (isset($headers['cluster-id'])) {

if ($clusterId = $headers['cluster-id'] ?? null) {
$flags |= 4;
$clusterId = $headers['cluster-id'];
$s += 1;
$s += $clusterIdLength = strlen($clusterId);
unset($headers['cluster-id']);
}

if (!empty($headers)) {
$flags |= 8192;
$this->writer->appendTable($headers, $headersBuffer = new Buffer());
Expand Down Expand Up @@ -1849,7 +1847,7 @@ public function awaitConfirmSelectOk(int $channel): Protocol\MethodConfirmSelect
return await($deferred->promise());
}

public function startHeathbeatTimer(): void
public function startHeartbeatTimer(): void
{
$this->heartbeatTimer = Loop::addTimer($this->options['heartbeat'], [$this, 'onHeartbeat']);
$this->connection->on('drain', [$this, 'onHeartbeat']);
Expand All @@ -1858,7 +1856,7 @@ public function startHeathbeatTimer(): void
/**
* Callback when heartbeat timer timed out.
*/
public function onHeartbeat()
public function onHeartbeat(): void
{
$now = microtime(true);
$nextHeartbeat = ($this->lastWrite ?: $now) + $this->options['heartbeat'];
Expand Down

0 comments on commit 3a2923f

Please sign in to comment.