Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:
The EnqueueBundle integrates enqueue library. It adds easy to use configuration layer, register services, adds handy cli commands.
$ composer require enqueue/enqueue-bundle enqueue/fs
Note: You could various other transports.
Note: If you are looking for a way to migrate from php-amqplib/rabbitmq-bundle
read this article.
Then, enable the bundle by adding new Enqueue\Bundle\EnqueueBundle()
to the bundles array of the registerBundles method in your project's app/AppKernel.php
file:
<?php
// src/Kernel.php
namespace App;
use Symfony\Component\HttpKernel\Kernel as BaseKernel;
class Kernel extends BaseKernel
{
public function registerBundles()
{
$bundles = array(
// ...
new Enqueue\Bundle\EnqueueBundle(),
);
// ...
}
// ...
}
First, you have to configure a transport layer and set one to be default.
# app/config/config.yml
enqueue:
default:
transport: "amqp:"
client: ~
Once you configured everything you can start producing messages:
<?php
use Enqueue\Client\ProducerInterface;
/** @var ProducerInterface $producer **/
$producer = $container->get(ProducerInterface::class);
// send event to many consumers
$producer->sendEvent('aFooTopic', 'Something has happened');
// send command to ONE consumer
$producer->sendCommand('aProcessorName', 'Something has happened');
To consume messages you have to first create a message processor:
<?php
use Interop\Queue\Message;
use Interop\Queue\Context;
use Interop\Queue\Processor;
use Enqueue\Client\TopicSubscriberInterface;
class FooProcessor implements Processor, TopicSubscriberInterface
{
public function process(Message $message, Context $session)
{
echo $message->getBody();
return self::ACK;
// return self::REJECT; // when the message is broken
// return self::REQUEUE; // the message is fine but you want to postpone processing
}
public static function getSubscribedTopics()
{
return ['aFooTopic'];
}
}
Register it as a container service and subscribe to the topic:
foo_message_processor:
class: 'FooProcessor'
tags:
- { name: 'enqueue.topic_subscriber' }
Now you can start consuming messages:
$ ./bin/console enqueue:consume --setup-broker -vvv
Note: Add -vvv to find out what is going while you are consuming messages. There is a lot of valuable debug info there.