Skip to content

Commit

Permalink
Merge pull request #1099 from atrauzzi/fix/dead-non-consumer-connections
Browse files Browse the repository at this point in the history
Fix - Add automatic reconnect support for STOMP producers
  • Loading branch information
makasim authored Oct 8, 2020
2 parents b7d5f9e + b46a945 commit 272c2bc
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 16 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ bin/jp.php
bin/php-parse
bin/google-cloud-batch
bin/thruway
bin/phpstan.phar
bin/var-dump-server
bin/yaml-lint
vendor
var
.php_cs
.php_cs.cache
composer.lock
composer.lock
15 changes: 8 additions & 7 deletions pkg/stomp/StompConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public function __construct($config = 'stomp:')
*/
public function createContext(): Context
{
if ($this->config['lazy']) {
return new StompContext(
function () { return $this->establishConnection(); },
$this->config['target']
);
}
$stomp = $this->config['lazy']
? function () { return $this->establishConnection(); }
: $this->establishConnection();

$target = $this->config['target'];
$detectTransientConnections = (bool) $this->config['detect_transient_connections'];

return new StompContext($this->establishConnection(), $this->config['target']);
return new StompContext($stomp, $target, $detectTransientConnections);
}

private function establishConnection(): BufferedStompClient
Expand Down Expand Up @@ -169,6 +169,7 @@ private function defaultConfig(): array
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
];
}
}
32 changes: 25 additions & 7 deletions pkg/stomp/StompContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ class StompContext implements Context
*/
private $stompFactory;

/**
* @var bool
*/
private $transient;

/**
* @param BufferedStompClient|callable $stomp
*/
public function __construct($stomp, string $extensionType)
public function __construct($stomp, string $extensionType, bool $detectTransientConnections = false)
{
if ($stomp instanceof BufferedStompClient) {
$this->stomp = $stomp;
Expand All @@ -53,6 +58,7 @@ public function __construct($stomp, string $extensionType)

$this->extensionType = $extensionType;
$this->useExchangePrefix = ExtensionType::RABBITMQ === $extensionType;
$this->transient = $detectTransientConnections;
}

/**
Expand Down Expand Up @@ -173,6 +179,8 @@ public function createConsumer(Destination $destination): Consumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);

$this->transient = false;

return new StompConsumer($this->getStomp(), $destination);
}

Expand All @@ -181,6 +189,10 @@ public function createConsumer(Destination $destination): Consumer
*/
public function createProducer(): Producer
{
if ($this->transient && $this->stomp) {
$this->stomp->disconnect();
}

return new StompProducer($this->getStomp());
}

Expand All @@ -202,14 +214,20 @@ public function purgeQueue(Queue $queue): void
public function getStomp(): BufferedStompClient
{
if (false == $this->stomp) {
$stomp = call_user_func($this->stompFactory);
if (false == $stomp instanceof BufferedStompClient) {
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
}

$this->stomp = $stomp;
$this->stomp = $this->createStomp();
}

return $this->stomp;
}

private function createStomp(): BufferedStompClient
{
$stomp = call_user_func($this->stompFactory);

if (false == $stomp instanceof BufferedStompClient) {
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
}

return $stomp;
}
}
9 changes: 9 additions & 0 deletions pkg/stomp/Tests/StompConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -91,6 +92,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -112,6 +114,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -134,6 +137,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -156,6 +160,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -178,6 +183,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -201,6 +207,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -222,6 +229,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -244,6 +252,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stomp/Tests/StompContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public function testShouldCloseConnections()
{
$client = $this->createStompClientMock();
$client
->expects($this->once())
->expects($this->atLeastOnce())
->method('disconnect')
;

Expand Down

0 comments on commit 272c2bc

Please sign in to comment.