Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add QoS support for listen function #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/Listener.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/**
* Listener Class, used to manage listening between RabbitMQ server
*
* @author Victor Cruz <cruzrosario@gmail.com>
* @author Victor Cruz <cruzrosario@gmail.com>
*/
class Listener extends BaseOptions {

Expand Down Expand Up @@ -62,12 +62,12 @@ public function __construct(Repository $config, array $options = NULL)
*
* @return void
*/
public function listen($queue_name, array $options = null, Closure $closure)
public function listen($queue_name, array $options = null, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $closure)
{
$this->queue_name = $queue_name;

if ($options)
$this->setOptions($options);
$this->setOptions($options);

$GLOBALS['messages_proccesed'] = 0;
$GLOBALS['start_time'] = time();
Expand All @@ -77,12 +77,18 @@ public function listen($queue_name, array $options = null, Closure $closure)

$listenerObject = $this;

$connection->channel->basic_qos(
$qos_prefetch_size,
$qos_prefetch_count,
false
);

$connection->channel->basic_consume($this->queue_name, $connection->consumer_tag, false, false, false, false, function ($msg) use ($closure, $listenerObject) {

try
{
$closure($msg->body);
}
}
catch (Exception $e)
{
throw $e;
Expand All @@ -93,10 +99,10 @@ public function listen($queue_name, array $options = null, Closure $closure)
//Update counters
$GLOBALS['messages_proccesed']++;

//Check if necesary to close consumer
//Check if necesary to close consumer
if ($listenerObject->message_limit && $GLOBALS['messages_proccesed'] >= $listenerObject->message_limit)
$msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);

if ($listenerObject->time && (time()-$GLOBALS['start_time']>= $listenerObject->time))
$msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
});
Expand All @@ -119,7 +125,7 @@ public function listen($queue_name, array $options = null, Closure $closure)
catch (Exception $e)
{
throw $e;
}
}
}
}

}
31 changes: 24 additions & 7 deletions src/Tail.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* Tail class, used as facade handler
*
* @author Victor Cruz <cruzrosario@gmail.com>
* @author Victor Cruz <cruzrosario@gmail.com>
*/
class Tail {

Expand Down Expand Up @@ -41,29 +41,46 @@ public function createMessage()
* Listen queue server for given queue name
*
* @param string $queue_name Queue name to listen
* @param array $options Options to listen
*
* @return void
*/
public function listen($queue_name, Closure $callback)
{
$listener = App::make('Mookofe\Tail\Listener');
$listener->listen($queue_name, null, $callback);
$listener->listen($queue_name, null, null, null, $callback);
}

/**
* Listen queue server for given queue name
*
* @param string $queue_name Queue name to listen
* @param int $qos_prefetch_size QoS pre-fetch size
* @param int $qos_prefetch_count QoS pre-fetch count
* @param Closure $closure Function to run for every message
*
* @return void
*/
public function listenWithQoS($queue_name, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $callback)
{
$listener = App::make('Mookofe\Tail\Listener');
$listener->listen($queue_name, null, $qos_prefetch_size, $qos_prefetch_count, $callback);
}

/**
* Listen queue server for given queue name
*
* @param string $queue_name Queue name to listen
* @param array $options Options to listen
* @param int $qos_prefetch_size QoS pre-fetch size
* @param int $qos_prefetch_count QoS pre-fetch count
* @param Closure $closure Function to run for every message
*
* @return void
*/
public function listenWithOptions($queue_name, array $options, Closure $callback)
public function listenWithOptions($queue_name, array $options, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $callback)
{
$listener = App::make('Mookofe\Tail\Listener');
$listener->listen($queue_name, $options, $callback);
$listener->listen($queue_name, $options, $qos_prefetch_size, $qos_prefetch_count, $callback);
}
}

}