From 2d3833c53876d8034ac81f2e4f398a43e30510af Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Jun 2017 14:03:02 +0300 Subject: [PATCH 1/3] producerv2 for simple client --- pkg/enqueue/Client/ProducerV2.php | 2 +- pkg/simple-client/SimpleClient.php | 13 +++++++++++++ .../SimpleClientContainerExtension.php | 12 ++++++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/enqueue/Client/ProducerV2.php b/pkg/enqueue/Client/ProducerV2.php index 9c997b77f..8a69dfd27 100644 --- a/pkg/enqueue/Client/ProducerV2.php +++ b/pkg/enqueue/Client/ProducerV2.php @@ -49,7 +49,7 @@ public function sendCommand($command, $message, $needReply = false) $message->setScope(Message::SCOPE_APP); if ($needReply) { - return $this->rpcClient->callAsync(Config::COMMAND_TOPIC, $message, 60); + return $this->rpcClient->callAsync(Config::COMMAND_TOPIC, $message, 60000); } $this->realProducer->send(Config::COMMAND_TOPIC, $message); diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index a65b719ee..ecc31aee2 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -11,6 +11,7 @@ use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\ProducerInterface; +use Enqueue\Client\ProducerV2Interface; use Enqueue\Client\RouterProcessor; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ExtensionInterface; @@ -185,6 +186,18 @@ public function getProducer($setupBroker = false) return $this->container->get('enqueue.client.producer'); } + /** + * @param bool $setupBroker + * + * @return ProducerV2Interface + */ + public function getProducerV2($setupBroker = false) + { + $setupBroker && $this->setupBroker(); + + return $this->container->get('enqueue.client.producer.v2'); + } + public function setupBroker() { $this->getDriver()->setupBroker(); diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php index 00cab237b..e2d13ddf3 100644 --- a/pkg/simple-client/SimpleClientContainerExtension.php +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -93,6 +93,18 @@ public function load(array $configs, ContainerBuilder $container) new Reference('enqueue.client.driver'), ]); + $container->register('enqueue.client.rpc', RpcClient::class) + ->setArguments([ + new Reference('enqueue.client.producer'), + new Reference('enqueue.transport.context') + ]); + + $container->register('enqueue.client.producer.v2', ProducerV2::class) + ->setArguments([ + new Reference('enqueue.client.producer'), + new Reference('enqueue.client.rpc') + ]); + $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) ->setArguments([[]]); From b317f2965d7aefaa9c9ccac6c3d15b500112436b Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Jun 2017 15:10:28 +0300 Subject: [PATCH 2/3] add receive timeout argument --- pkg/enqueue/Client/RpcClient.php | 9 ++++--- pkg/enqueue/Rpc/Promise.php | 13 +++++---- pkg/enqueue/Rpc/RpcClient.php | 9 ++++--- pkg/enqueue/Tests/Client/RpcClientTest.php | 3 ++- pkg/enqueue/Tests/Rpc/PromiseTest.php | 27 ++++++++++++++++++- pkg/enqueue/Tests/Rpc/RpcClientTest.php | 3 ++- .../SimpleClientContainerExtension.php | 4 +-- 7 files changed, 50 insertions(+), 18 deletions(-) diff --git a/pkg/enqueue/Client/RpcClient.php b/pkg/enqueue/Client/RpcClient.php index 440c8f59b..6ca123a6a 100644 --- a/pkg/enqueue/Client/RpcClient.php +++ b/pkg/enqueue/Client/RpcClient.php @@ -78,12 +78,13 @@ public function callAsync($topic, $message, $timeout) $correlationId = $message->getCorrelationId(); - $receive = function () use ($replyQueue, $timeout, $correlationId) { - $endTime = time() + ((int) ($timeout / 1000)); + $receive = function (Promise $promise, $promiseTimeout) use ($replyQueue, $timeout, $correlationId) { + $runTimeout = $promiseTimeout ?: $timeout; + $endTime = time() + ((int) ($runTimeout / 1000)); $consumer = $this->context->createConsumer($replyQueue); do { - if ($message = $consumer->receive($timeout)) { + if ($message = $consumer->receive($runTimeout)) { if ($message->getCorrelationId() === $correlationId) { $consumer->acknowledge($message); @@ -94,7 +95,7 @@ public function callAsync($topic, $message, $timeout) } } while (time() < $endTime); - throw TimeoutException::create($timeout, $correlationId); + throw TimeoutException::create($runTimeout, $correlationId); }; $receiveNoWait = function () use ($replyQueue, $correlationId) { diff --git a/pkg/enqueue/Rpc/Promise.php b/pkg/enqueue/Rpc/Promise.php index 53c84b498..f716f3950 100644 --- a/pkg/enqueue/Rpc/Promise.php +++ b/pkg/enqueue/Rpc/Promise.php @@ -62,15 +62,17 @@ public function getMessage() /** * Blocks until message received or timeout expired. * + * @param int $timeout + * * @throws TimeoutException if the wait timeout is reached * * @return PsrMessage */ - public function receive() + public function receive($timeout = null) { if (null == $this->message) { try { - if ($message = $this->doReceive($this->receiveCallback)) { + if ($message = $this->doReceive($this->receiveCallback, $this, $timeout)) { $this->message = $message; } } finally { @@ -89,7 +91,7 @@ public function receive() public function receiveNoWait() { if (null == $this->message) { - if ($message = $this->doReceive($this->receiveNoWaitCallback)) { + if ($message = $this->doReceive($this->receiveNoWaitCallback, $this)) { $this->message = $message; call_user_func($this->finallyCallback, $this); @@ -119,12 +121,13 @@ public function isDeleteReplyQueue() /** * @param \Closure $cb + * @param array $args * * @return PsrMessage */ - private function doReceive(\Closure $cb) + private function doReceive(\Closure $cb, ...$args) { - $message = call_user_func($cb, $this); + $message = call_user_func_array($cb, $args); if (null !== $message && false == $message instanceof PsrMessage) { throw new \RuntimeException(sprintf( diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index 87d65bf15..578ca289c 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -66,12 +66,13 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim $correlationId = $message->getCorrelationId(); - $receive = function () use ($replyQueue, $timeout, $correlationId) { - $endTime = time() + ((int) ($timeout / 1000)); + $receive = function (Promise $promise, $promiseTimeout) use ($replyQueue, $timeout, $correlationId) { + $runTimeout = $promiseTimeout ?: $timeout; + $endTime = time() + ((int) ($runTimeout / 1000)); $consumer = $this->context->createConsumer($replyQueue); do { - if ($message = $consumer->receive($timeout)) { + if ($message = $consumer->receive($runTimeout)) { if ($message->getCorrelationId() === $correlationId) { $consumer->acknowledge($message); @@ -82,7 +83,7 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim } } while (time() < $endTime); - throw TimeoutException::create($timeout, $correlationId); + throw TimeoutException::create($runTimeout, $correlationId); }; $receiveNoWait = function () use ($replyQueue, $correlationId) { diff --git a/pkg/enqueue/Tests/Client/RpcClientTest.php b/pkg/enqueue/Tests/Client/RpcClientTest.php index 37311ba1d..3b75bc56f 100644 --- a/pkg/enqueue/Tests/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Client/RpcClientTest.php @@ -174,6 +174,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() $consumer ->expects($this->once()) ->method('receive') + ->with(12345) ->willReturn($receivedMessage) ; $consumer @@ -202,7 +203,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() $rpc = new RpcClient($this->createProducerMock(), $context); - $rpc->callAsync('topic', $message, 2)->receive(); + $rpc->callAsync('topic', $message, 2)->receive(12345); } public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() diff --git a/pkg/enqueue/Tests/Rpc/PromiseTest.php b/pkg/enqueue/Tests/Rpc/PromiseTest.php index ffc729500..2018bc27a 100644 --- a/pkg/enqueue/Tests/Rpc/PromiseTest.php +++ b/pkg/enqueue/Tests/Rpc/PromiseTest.php @@ -29,14 +29,39 @@ public function testCouldSetGetDeleteReplyQueue() public function testOnReceiveShouldCallReceiveCallBack() { $receiveInvoked = false; - $receivecb = function () use (&$receiveInvoked) { + $receivePromise = null; + $receiveTimeout = null; + $receivecb = function ($promise, $timout) use (&$receiveInvoked, &$receivePromise, &$receiveTimeout) { $receiveInvoked = true; + $receivePromise = $promise; + $receiveTimeout = $timout; }; $promise = new Promise($receivecb, function () {}, function () {}); $promise->receive(); $this->assertTrue($receiveInvoked); + $this->assertInstanceOf(Promise::class, $receivePromise); + $this->assertNull($receiveTimeout); + } + + public function testOnReceiveShouldCallReceiveCallBackWithTimeout() + { + $receiveInvoked = false; + $receivePromise = null; + $receiveTimeout = null; + $receivecb = function ($promise, $timout) use (&$receiveInvoked, &$receivePromise, &$receiveTimeout) { + $receiveInvoked = true; + $receivePromise = $promise; + $receiveTimeout = $timout; + }; + + $promise = new Promise($receivecb, function () {}, function () {}); + $promise->receive(12345); + + $this->assertTrue($receiveInvoked); + $this->assertInstanceOf(Promise::class, $receivePromise); + $this->assertSame(12345, $receiveTimeout); } public function testOnReceiveNoWaitShouldCallReceiveNoWaitCallBack() diff --git a/pkg/enqueue/Tests/Rpc/RpcClientTest.php b/pkg/enqueue/Tests/Rpc/RpcClientTest.php index 068956879..13f6779ae 100644 --- a/pkg/enqueue/Tests/Rpc/RpcClientTest.php +++ b/pkg/enqueue/Tests/Rpc/RpcClientTest.php @@ -114,6 +114,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() $consumer ->expects($this->once()) ->method('receive') + ->with(12345) ->willReturn($receivedMessage) ; $consumer @@ -147,7 +148,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() $rpc = new RpcClient($context); - $rpc->callAsync($queue, $message, 2)->receive(); + $rpc->callAsync($queue, $message, 2)->receive(12345); } public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php index e2d13ddf3..051deb843 100644 --- a/pkg/simple-client/SimpleClientContainerExtension.php +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -96,13 +96,13 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.rpc', RpcClient::class) ->setArguments([ new Reference('enqueue.client.producer'), - new Reference('enqueue.transport.context') + new Reference('enqueue.transport.context'), ]); $container->register('enqueue.client.producer.v2', ProducerV2::class) ->setArguments([ new Reference('enqueue.client.producer'), - new Reference('enqueue.client.rpc') + new Reference('enqueue.client.rpc'), ]); $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) From 6572cffb07bd4cd395a80f9f03ac9d4827241086 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 16 Jun 2017 12:25:58 +0300 Subject: [PATCH 3/3] fix container extension --- pkg/simple-client/SimpleClientContainerExtension.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php index 051deb843..bfe31ddc4 100644 --- a/pkg/simple-client/SimpleClientContainerExtension.php +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -10,7 +10,9 @@ use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\Producer; +use Enqueue\Client\ProducerV2; use Enqueue\Client\RouterProcessor; +use Enqueue\Client\RpcClient; use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; use Enqueue\Consumption\QueueConsumer; use Enqueue\Symfony\TransportFactoryInterface;