Skip to content
This repository has been archived by the owner on Jul 3, 2024. It is now read-only.

Latest commit

 

History

History
353 lines (265 loc) · 10.6 KB

3.Jobs.md

File metadata and controls

353 lines (265 loc) · 10.6 KB

Documentation - Jobs

Every job must implement the interface SlmQueue\Job\JobInterface. For ease of use, an abstract class SlmQueue\Job\AbstractJob is provided to implement most of the interface methods. The method, execute(), must be implemented in userland code.

An example job would look like the following:

namespace MyModule\Job;

use SlmQueue\Job\AbstractJob;

class PrintHelloWorldJob extends AbstractJob
{
    public function execute()
    {
        echo 'Hello World';
    }
}

Job payload

The job often needs some data to work with, commonly called a payload. For an email job, the contents of the email (to address, subject and message) are the payload. If you generate an invoice based on an order in your database, you set the id of this order as the payload.

Internally, a SlmQueue\Job\AbstractJob extends the Laminas\Stdlib\Message class which allows it to set and get content. These methods are used to set the payload upon pushing and retrieve the payload when the job is executed.

To set the payload in a controller, below is an example:

namespace MyModule\Controller;

use Laminas\Mvc\Controller\AbstractActionController;

class MyController extends AbstractActionController
{
    public function fooAction()
    {
        // Do some work

        $job = $this->jobManager->get('MyModule\Job\SendEmailJob');
        $job->setContent([
            'to'      => 'bob@acme.com',
            'subject' => 'Registration completed',
            'message' => 'Hi bob, you just registered for our website! Welcome!'
         ]);

        $this->queue->push($job);
    }
}

Then you can fetch this payload from the job itself:

namespace MyModule\Job;

use SlmQueue\Job\AbstractJob;

class SendEmailJob extends AbstractJob
{
    public function execute()
    {
        $payload = $this->getContent();
        $to      = $payload['to'];
        $subject = $payload['subject'];
        $message = $payload['message'];

        // Do something with $to, $subjet, $message
    }
}

Job payload are a flexible array structure and will be automatically serialized by SlmQueue. This means you can have values like a DateTime object as payload for jobs which will be serialized and deserialized in the background. However, this also give you restrictions than payload data must be serializable. Doctrine 2 entities, references to Laminas\Mvc\Application or other unserializable instances should not be set as payload.

Job dependencies

Because of the configuration via a job plugin manager, you can inject dependencies you need into the constructor of the job class. This will require you to define a job factory for that job as well. Assume here we send an email in a job and this job requires the email transport class as a dependency.

namespace MyModule\Job;

use SlmQueue\Job\AbstractJob;
use Laminas\Mail\Transport\TransportInterface;
use Laminas\Mail\Message;

class SendEmailJob extends AbstractJob
{
    protected $transport;

    public function __construct(TransportInterface $transport)
    {
        $this->transport = $transport;
    }

    public function execute()
    {
        $message = new Message;
        $payload = $this->getContent();

        $message->setTo($payload['to']);
        $message->setSubject($payload['subject']);
        $message->setContent($payload['content']);

        $this->transport->send($message);
    }
}

To inject the email transport instance, a factory must be created to instantiate the job:

namespace MyModule\Factory;

use Laminas\ServiceManager\Factory\FactoryInterface;
use Laminas\ServiceManager\ServiceLocatorInterface;

use MyModule\Job\SendEmailJob;

class SendEmailJobFactory implements FactoryInterface
{
    public function __invoke(ContainerInterface $container, $requestedName, array $options = null)
    {
        $transport = $container->getServiceLocator()->get('my-transport-service');

        $job = new SendEmailJob($transport);
        return $job;
    }
}

The last step is to register this factory for the above job. Note that in order for this to work the service should be registered with the FQCN of the job class as its identifier.

'slm_queue' => [
    'job_manager' => [
        'factories' => [
            'MyModule\Job\SendEmailJob' => 'MyModule\Factory\SendEmailJobFactory',
        ],
    ],
]

Jobs with binary payload

By default, SlmQueue will fail if your job contains a binary payload. If you want to include a binary payload for your job, your job can implement the SlmQueue\Queue\BinaryMessageInterface.

For example, when you want to send an email with a binary attachment:

namespace MyModule\Job;

use SlmQueue\Job\AbstractJob;
use SlmQueue\Queue\BinaryMessageInterface
use Laminas\Mail\Transport\TransportInterface;
use Laminas\Mail\Message;
use Laminas\Mime;

class SendEmailWithAttachmentJob extends AbstractJob implements BinaryMessageInterface
{
    protected $transport;

    public function __construct(TransportInterface $transport)
    {
        $this->transport = $transport;
    }

    public function execute()
    {
        $message = new Message;
        $payload = $this->getContent();

        $message->setTo($payload['to']);
        $message->setSubject($payload['subject']);

        $text = new Mime\Part($payload['content']);
        $text->type = 'text/plain';

        $attachment = new Mime\Part($payload['attachment']);
        $attachment->filename = $payload['attachment_filename'];
        $attachment->type = $payload['attachment_type'];

        $mimeMessage = new Mime\Message([$text, $attachment]);

        $message->setBody($mimeMessage);

        $this->transport->send($message);
    }
}

If you have configured the job with a factory, you will now be able to queue jobs with binary contents:

$job = $jobManager->get('MyModule\Job\SendEmailWithAttachmentJob');

$job->setContent([
    'to' => 'info@example.com',
    'subject' => 'Email with attachment',
    'content' => 'Hello, this is an email with an attachment',
    'attachment' => file_get_contents('some/image/file.png'),
    'attachment_type' => 'image/png',
    'attachment_filename' => 'cat.png'
]);

$queue->push($job);

Push jobs via the controller plugin

Above examples all showed how to inject jobs using the queue directly. This requires SlmQueue users to inject the queue, or queue plugin manager, in the controller. To avoid duplicate code about queue injection and job retrieval, a queue controller plugin is created. The controller plugins allows users to push jobs by its service name.

namespace MyModule\Controller;

use Laminas\Mvc\Controller\AbstractActionController;

class MyController extends AbstractActionController
{
    public function fooAction()
    {
        // Do some work

        $this->queue('default')
             ->push('MyModule\Job\SendEmailJob', [
                 'to'      => 'bob@acme.com',
                 'subject' => 'Registration completed',
                 'message' => 'Hi bob, you just registered for our website! Welcome!'
             ]);
    }
}

The second parameter from the plugin's push() is optional. You can omit the array completely. Also, it is possible to inject multiple jobs into the same queue, or inject a job in another queue:

namespace MyModule\Controller;

use Laminas\Mvc\Controller\AbstractActionController;

class MyController extends AbstractActionController
{
    public function fooAction()
    {
        // Do some work

        // Push two jobs into the same queue
        $this->queue('default')
             ->push('MyModule\Job\SendEmailJob', [
                 'to'      => 'bob@acme.com',
                 'subject' => 'Registration completed',
                 'message' => 'Hi bob, you just registered for our website! Welcome!'
             ])
             ->push('MyModule\Job\TweetJob', [
                 'message' => 'We have a new user registered, his name is bob!'
             ]);

        // Push into anther queue
        $this->queue('background')
             ->push('OtherModule\Job\BussinessReportJob');
    }
}

Job status codes

When using events you might want to hook in the status process of a job. Has a job successfully been executed or were there errors? The result of a job is expressed in its status code. SlmQueue defines the following default status codes:

  1. JOB_STATUS_UNKNOWN
  2. JOB_STATUS_SUCCESS
  3. JOB_STATUS_FAILURE
  4. JOB_STATUS_FAILURE_RECOVERABLE

The status codes are stored in the ProcessJobEvent object (more on that at the event section). Normally when jobs are completely executed, the status is success. If any exception is thrown, the status is set to failure.

use SlmQueue\Job\AbstractJob;

class SuccessfulJob extends AbstractJob
{
    public function execute()
    {
        // all is OK
    }
}
use RuntimeException
use SlmQueue\Job\AbstractJob;

class FailingJob extends AbstractJob
{
    public function execute()
    {
        throw new RuntimeException('Not going well');
    }
}

Troubleshooting

Issue: My job is failing but the worker keeps on going. How can I get more information?

Potential exception: JOB_SIZE_TOO_LARGE

Cause: your job is passing in a large object or collection of results. E.g., Beanstalkd has an upper limit on the size of the job.

Diagnostic process: Add try / catch handling in your job around the area where your job is leveraging the QueueAware items and pushing out a new job. It is at this point that you will discover that an exception is thrown but not displayed anywhere in the worker.

Solution: Change your thinking about what can / cannot be pushed in via a job based on the middleware stack that you are using with SlmQueue. If you need to push large byte streams around (e.g., massive collection of results) around then using a caching layer like Redis may suit your purposes. Otherwise something more persistent like AWS S3 or clustered storage may be a good candidate for persisting the large data. Your job should then pass a reference to that byte stream that can then be fetched and utilized rather than the actual byte stream.

Side note: a recurring clean-up job may need to be written if the byte stream is intended to be ephemeral and the receiving job fails for some reason.

Navigation

Previous page: Configuration Next page: Queue Aware

  1. Introduction
  2. Configuration
  3. Jobs
  4. QueueAware
  5. Workers
  6. Events
  7. Worker management