Skip to content
This repository has been archived by the owner on Feb 13, 2020. It is now read-only.

Commit

Permalink
Merge pull request #2 from fieg/job-buried-event
Browse files Browse the repository at this point in the history
Added job buried event
  • Loading branch information
fieg committed Aug 27, 2015
2 parents 04b6de5 + bd12d7e commit 57b2f0b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/TreeHouse/WorkerBundle/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ protected function configure()
->addOption('max-memory', 'm', InputOption::VALUE_OPTIONAL, 'Maximum amount of memory to use (in MB). The worker will try to stop before this limit is reached. Set to 0 for infinite.', 0)
->addOption('max-time', 't', InputOption::VALUE_OPTIONAL, 'Maximum running time in seconds. Set to 0 for infinite', 0)
->addOption('batch-size', 'b', InputOption::VALUE_OPTIONAL, 'Number of jobs to process before completing a batch', 15)
->addOption('min-duration', 'd', InputOption::VALUE_OPTIONAL, 'Number of seconds to the worker process should minimal take to run', 15)
;
}

Expand All @@ -59,10 +60,11 @@ protected function execute(InputInterface $input, OutputInterface $output)

$dispatcher = $this->manager->getDispatcher();

$maxMemory = intval($input->getOption('max-memory')) * 1024 * 1024;
$maxTime = intval($input->getOption('max-time'));
$maxJobs = intval($input->getOption('limit'));
$batchSize = intval($input->getOption('batch-size'));
$maxMemory = intval($input->getOption('max-memory')) * 1024 * 1024;
$maxTime = intval($input->getOption('max-time'));
$maxJobs = intval($input->getOption('limit'));
$batchSize = intval($input->getOption('batch-size'));
$minDuration = intval($input->getOption('min-duration'));

$logger = $this->manager->getLogger();
if (($logger instanceof Monolog\Logger) && false === $this->hasConsoleHandler($logger)) {
Expand All @@ -73,7 +75,6 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->watchActions($input->getOption('action'), $input->getOption('exclude'));

$start = time();
$minDuration = 15;
$jobsCompleted = 0;

// wait for job, timeout after 1 minute
Expand Down
66 changes: 66 additions & 0 deletions src/TreeHouse/WorkerBundle/Event/JobBuriedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

namespace TreeHouse\WorkerBundle\Event;

use Pheanstalk\Job;
use Symfony\Component\EventDispatcher\Event;

class JobBuriedEvent extends Event
{
/**
* Number of attempts before job got buried
*
* @var int
*/
protected $attempts;

/**
* @var Job
*/
protected $job;

/**
* Exception that caused the job to be buried
*
* @var \Exception
*/
protected $exception;

/**
* JobBuriedEvent constructor.
*
* @param Job $job
* @param \Exception $exception
* @param int $attempts
*/
public function __construct(Job $job, \Exception $exception, $attempts)
{
$this->job = $job;
$this->exception = $exception;
$this->attempts = $attempts;
}

/**
* @return Job
*/
public function getJob()
{
return $this->job;
}

/**
* @return \Exception
*/
public function getException()
{
return $this->exception;
}

/**
* @return int
*/
public function getAttempts()
{
return $this->attempts;
}
}
3 changes: 3 additions & 0 deletions src/TreeHouse/WorkerBundle/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Symfony\Component\OptionsResolver\Exception\ExceptionInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
use TreeHouse\WorkerBundle\Event\ExecutionEvent;
use TreeHouse\WorkerBundle\Event\JobBuriedEvent;
use TreeHouse\WorkerBundle\Event\JobEvent;
use TreeHouse\WorkerBundle\Exception\AbortException;
use TreeHouse\WorkerBundle\Exception\RescheduleException;
Expand Down Expand Up @@ -503,6 +504,8 @@ public function executeJob(Job $job, $maxRetries = 1)
if ($releases > $maxRetries) {
// no more retries, bury job for manual inspection
$this->bury($job);

$this->dispatcher->dispatch(WorkerEvents::JOB_BURIED_EVENT, new JobBuriedEvent($job, $e, $releases));
} else {
// try again, regardless of the error
$this->reschedule($job, new \DateTime('+10 minutes'));
Expand Down
6 changes: 6 additions & 0 deletions src/TreeHouse/WorkerBundle/WorkerEvents.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ final class WorkerEvents
*/
const EXECUTE_JOB = 'worker.job.execute';

/**
* Dispatched when job is buried
* Listeners receive a JobBuriedEvent instance.
*/
const JOB_BURIED_EVENT = 'worker.job.buried';

/**
* Dispatched before executing an action.
* Listeners receive a ExecutionEvent instance.
Expand Down

0 comments on commit 57b2f0b

Please sign in to comment.