Skip to content

Commit

Permalink
add Producer::selfCheck() function
Browse files Browse the repository at this point in the history
  • Loading branch information
enl committed Sep 8, 2016
1 parent 6ae2b16 commit a5de132
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 18 deletions.
7 changes: 7 additions & 0 deletions src/BatchProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

namespace AmqpWorkers;

/**
* BatchProducer treats given message as a list of messages and send them to RabbitMQ channel
*
* @package AmqpWorkers
* @author Alex Panshin <deadyaga@gmail.com>
* @since 1.0
*/
class BatchProducer extends Producer
{
/**
Expand Down
4 changes: 4 additions & 0 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use AmqpWorkers\Definition\Qos;
use AmqpWorkers\Definition\Queue;
use AmqpWorkers\Exception\ConsumerNotProperlyConfigured;
use AmqpWorkers\Exception\ProducerNotProperlyConfigured;
use AmqpWorkers\Worker\WorkerInterface;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Message\AMQPMessage;
Expand Down Expand Up @@ -117,6 +118,7 @@ public function produceResult(Producer $producer)
* Starts consumer. By default, this function can be terminated only by Worker's exception
*
* @throws ConsumerNotProperlyConfigured
* @throws ProducerNotProperlyConfigured if given producer is not properly configured
*/
public function run()
{
Expand All @@ -127,6 +129,8 @@ public function run()
throw new ConsumerNotProperlyConfigured('Worker is not defined.');
}

$this->producer && $this->producer->selfCheck();

$wrapper = function (AMQPMessage $message) {
$result = call_user_func($this->worker, $message->getBody());

Expand Down
47 changes: 29 additions & 18 deletions src/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@

/**
* Producer is an object that sends something into AMQP exchange.
* It can format given messages somehow,
* has very stupid (for now) batch messages processing
* and can work with both queues and exchanges as a target.
* It can format given messages somehow and can work with both queues and exchanges as a target.
*
* @package AmqpWorkers
* @author Alex Panshin <deadyaga@gmail.com>
Expand Down Expand Up @@ -55,7 +53,7 @@ class Producer
* ```
*
* @param AbstractConnection $connection
* @return Producer
* @return Producer $this
*/
public static function factory(AbstractConnection $connection)
{
Expand Down Expand Up @@ -97,18 +95,10 @@ public function withQueue(Queue $queue)
return $this;
}

/**
* @return bool
*/
protected function isExchange()
{
return $this->isExchange;
}

/**
* @param \Closure|callable $formatter
* @return Producer $this
* @throws \AmqpWorkers\Exception\ProducerNotProperlyConfigured
* @throws ProducerNotProperlyConfigured
*/
public function withFormatter($formatter)
{
Expand All @@ -123,7 +113,7 @@ public function withFormatter($formatter)
/**
* @param mixed $payload
* @todo: maybe add properties array as second parameter
* @throws \AmqpWorkers\Exception\ProducerNotProperlyConfigured if queue nor exchange not given.
* @throws ProducerNotProperlyConfigured if queue nor exchange not given.
*/
public function produce($payload)
{
Expand All @@ -142,16 +132,29 @@ public function produce($payload)
}

/**
* @return AMQPChannel
* @todo: declare queue only once?
* @throws \AmqpWorkers\Exception\ProducerNotProperlyConfigured
* Returns `true` if producer is properly configured. Throws exception otherwise.
* Function is public just because Consumer needs to check if given producer configured before consuming the queue.
*
* @return bool
* @throws ProducerNotProperlyConfigured
*/
protected function getChannel()
public function selfCheck()
{
if ($this->exchange === null && $this->queue === null) {
throw new ProducerNotProperlyConfigured('Nor queue nor exchange given.');
}

return true;
}

/**
* @return AMQPChannel
* @throws ProducerNotProperlyConfigured
*/
protected function getChannel()
{
$this->selfCheck();

$channel = $this->connection->channel();

if ($this->isExchange()) {
Expand Down Expand Up @@ -196,4 +199,12 @@ protected function createMessage($payload)
{
return new AMQPMessage(call_user_func($this->formatter, $payload));
}

/**
* @return bool
*/
protected function isExchange()
{
return $this->isExchange;
}
}
14 changes: 14 additions & 0 deletions tests/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
use AmqpWorkers\Consumer;
use AmqpWorkers\Definition\Queue;
use AmqpWorkers\Exception\ConsumerNotProperlyConfigured;
use AmqpWorkers\Exception\ProducerNotProperlyConfigured;
use AmqpWorkers\Producer;
use AmqpWorkers\Worker\ClosureWorker;
use Mockery as m;
use Mockery\MockInterface;
use Mockery\Adapter\Phpunit\MockeryTestCase;
Expand Down Expand Up @@ -49,4 +52,15 @@ public function testEmptyWorker()
Consumer::factory($this->connection)->withQueue(new Queue('test'))->run();
}

public function testNonConfiguredProducer()
{
$this->expectException(ProducerNotProperlyConfigured::class);

Consumer::factory($this->connection)
->withQueue(new Queue('test'))
->withWorker(new ClosureWorker(function($message) {}))
->produceResult(Producer::factory($this->connection))
->run();
}

}

0 comments on commit a5de132

Please sign in to comment.