diff --git a/DJJob.php b/DJJob.php index d755e29..b16b359 100644 --- a/DJJob.php +++ b/DJJob.php @@ -4,6 +4,9 @@ class DJException extends Exception { } +/** + * Exception thrown when the job should be retried after a specific period. + */ class DJRetryException extends DJException { private $delay_seconds = 7200; @@ -26,6 +29,9 @@ interface DJJobHandlerInterface { public function perform(); } +/** + * Base class for delayed job. + */ class DJBase { // error severity levels @@ -35,19 +41,47 @@ class DJBase { const INFO = 1; const DEBUG = 0; + /** + * @var int + */ private static $log_level = self::DEBUG; + /** + * @var null|PDO + */ private static $db = null; + + /** + * @var string + */ protected static $jobsTable = ""; + /** + * @var string + */ private static $dsn = ""; + + /** + * @var string + */ private static $user = ""; + + /** + * @var string + */ private static $password = ""; + + /** + * @var int + */ private static $retries = 3; //default retries // use either `configure` or `setConnection`, depending on if // you already have a PDO object you can re-use + /** + * Configures DJJob with certain values for the database connection. + */ public static function configure(){ $args = func_get_args(); $numArgs = func_num_args(); @@ -76,6 +110,15 @@ public static function configure(){ } } + /** + * Configures DJJob with certain values for the database connection. + * + * @param $dsn The PDO connection string. + * @param array $options The options for the PDO connection. + * @param string $jobsTable Name of the jobs table. + * + * @throws DJException Throws an exception with invalid parameters. + */ protected static function configureWithDsnAndOptions($dsn, array $options = array(), $jobsTable = 'jobs') { if (!isset($options['mysql_user'])){ throw new DJException("Please provide the database user in configure options array."); @@ -96,6 +139,12 @@ protected static function configureWithDsnAndOptions($dsn, array $options = arra } } + /** + * @param array $options + * @param string $jobsTable + * + * @throws DJException Throws an exception with invalid parameters. + */ protected static function configureWithOptions(array $options, $jobsTable = 'jobs') { if (!isset($options['driver'])){ @@ -129,14 +178,28 @@ protected static function configureWithOptions(array $options, $jobsTable = 'job } } + /** + * @param int $const The log level to set. + */ public static function setLogLevel($const) { self::$log_level = $const; } + /** + * @param PDO $db The database connection to use. + */ public static function setConnection(PDO $db) { self::$db = $db; } + /** + * Returns the connection DJBase knows about. + * + * Tries to connect if no connection is present. + * + * @return null|PDO The connection if a valid connection is present. + * @throws Exception + */ protected static function getConnection() { if (self::$db === null) { try { @@ -149,6 +212,15 @@ protected static function getConnection() { return self::$db; } + /** + * Runs a query with a resultset against the database. + * + * @param string $sql The query to execute. + * @param array $params The params necessary for a prepared statement. + * + * @return array Returns the complete resultset. + * @throws DJException Throws if the query couldn't be executed. + */ public static function runQuery($sql, $params = array()) { for ($attempts = 0; $attempts < self::$retries; $attempts++) { try { @@ -180,6 +252,15 @@ public static function runQuery($sql, $params = array()) { throw new DJException("DJJob exhausted retries connecting to database"); } + /** + * Runs an update query against the database. + * + * @param string $sql The query to execute. + * @param array $params The params necessary for the prepared statement. + * + * @return int The amount of affected rows. + * @throws DJException Throws if the query couldn't be executed. + */ public static function runUpdate($sql, $params = array()) { for ($attempts = 0; $attempts < self::$retries; $attempts++) { try { @@ -202,6 +283,12 @@ public static function runUpdate($sql, $params = array()) { throw new DJException("DJJob exhausted retries connecting to database"); } + /** + * Logs a message to the output. + * + * @param string $mesg The message to log. + * @param int $severity The log level necessary for this message to display. + */ protected static function log($mesg, $severity=self::CRITICAL) { if ($severity >= self::$log_level) { printf("[%s] %s\n", date('c'), $mesg); @@ -209,10 +296,25 @@ protected static function log($mesg, $severity=self::CRITICAL) { } } +/** + * The worker class that can empty a queue. + */ class DJWorker extends DJBase { # This is a singleton-ish thing. It wouldn't really make sense to # instantiate more than one in a single request (or commandline task) + /** + * DJWorker constructor. + * + * The following options are available: + * `queue`: The queue to work on. Default: 'default' + * `count`: How many jobs to execute before exiting. Use '0' for no-limit. Default: '0' + * `sleep`: How long to sleep if no jobs are found. Default: '5' + * `max_attempts`: How many times to try a job before bailing out. Default: '5' + * `fail_on_output`: Whether to fail on output. Default: 'false' + * + * @param array $options The settings for this worker. + */ public function __construct($options = array()) { $options = array_merge(array( "queue" => "default", @@ -233,6 +335,11 @@ public function __construct($options = array()) { } } + /** + * Handles a signal from the operating system. + * + * @param string $signo The signal received from the OS. + */ public function handleSignal($signo) { $signals = array( SIGTERM => "SIGTERM", @@ -245,6 +352,9 @@ public function handleSignal($signo) { die(0); } + /** + * Releases all locks this worker has on the jobs table. + */ public function releaseLocks() { $this->runUpdate(" UPDATE " . self::$jobsTable . " @@ -259,7 +369,8 @@ public function releaseLocks() { * why this? * run newest first, some jobs get left behind * run oldest first, all jobs get left behind - * @return DJJob + * + * @return \DJJob|false A job if one was successfully locked. Otherwise false. */ public function getNewJob() { # we can grab a locked job if we own the lock @@ -289,6 +400,9 @@ public function getNewJob() { return false; } + /** + * Starts the worker process. + */ public function start() { $this->log("[JOB] Starting worker {$this->name} on queue::{$this->queue}", self::INFO); @@ -318,8 +432,22 @@ public function start() { } } +/** + * Represents a job that needs to be executed. + */ class DJJob extends DJBase { + /** + * Constructs the Job + * + * Possible options: + * `max_attempts`: The amount of attempts before bailing out. Default: '5' + * `fail_on_output`: Whether the job fails if there is output in the handler. Default: 'false' + * + * @param string $worker_name Name of the worker that created this job. + * @param int $job_id ID of this job. + * @param array $options The options. + */ public function __construct($worker_name, $job_id, $options = array()) { $options = array_merge(array( "max_attempts" => 5, @@ -331,6 +459,13 @@ public function __construct($worker_name, $job_id, $options = array()) { $this->fail_on_output = $options["fail_on_output"]; } + /** + * Runs this job. + * + * First retrieves the handler fro the database. Then perform the job. + * + * @return bool Whether or not the job succeeded. + */ public function run() { # pull the handler from the db $handler = $this->getHandler(); @@ -392,6 +527,11 @@ public function run() { } } + /** + * Acquires lock on this job. + * + * @return bool Whether or not acquiring the lock succeeded. + */ public function acquireLock() { $this->log("[JOB] attempting to acquire lock for job::{$this->job_id} on {$this->worker_name}", self::INFO); @@ -409,6 +549,9 @@ public function acquireLock() { return true; } + /** + * Releases the lock on this job. + */ public function releaseLock() { $this->runUpdate(" UPDATE " . self::$jobsTable . " @@ -418,6 +561,9 @@ public function releaseLock() { ); } + /** + * Finishes this job. Will delete it from the jobs table. + */ public function finish() { $this->runUpdate( "DELETE FROM " . self::$jobsTable . " WHERE id = ?", @@ -426,6 +572,12 @@ public function finish() { $this->log("[JOB] completed job::{$this->job_id}", self::INFO); } + /** + * Finishes this job, but with an error. Keeps the job in the jobs table. + * + * @param string $error The error message to write to the job. + * @param null|object $handler The handler that ran this job. + */ public function finishWithError($error, $handler = null) { $this->runUpdate(" UPDATE " . self::$jobsTable . " @@ -449,6 +601,11 @@ public function finishWithError($error, $handler = null) { } } + /** + * Saves a retry date to this job. + * + * @param int $delay The amount of seconds to delay this job. + */ public function retryLater($delay) { $this->runUpdate(" UPDATE " . self::$jobsTable . " @@ -463,6 +620,11 @@ public function retryLater($delay) { $this->releaseLock(); } + /** + * Returns the handler for this job. + * + * @return bool|object The handler object for this job. Or false if it failed. + */ public function getHandler() { $rs = $this->runQuery( "SELECT handler FROM " . self::$jobsTable . " WHERE id = ?", @@ -472,6 +634,11 @@ public function getHandler() { return false; } + /** + * Returns the amount of attempts left for this job. + * + * @return bool The amount of attempts left. + */ public function getAttempts() { $rs = $this->runQuery( "SELECT attempts FROM " . self::$jobsTable . " WHERE id = ?", @@ -481,6 +648,15 @@ public function getAttempts() { return false; } + /** + * Enqueues a job to the database. + * + * @param object $handler The handler that can execute this job. + * @param string $queue The queue to enqueue this job to. All queues are saved in the same table. + * @param string $run_at A valid mysql DATETIME string at which to run the jobs. + * + * @return bool|string Returns the last inserted ID or false if enqueuing failed. + */ public static function enqueue($handler, $queue = "default", $run_at = null) { $affected = self::runUpdate( "INSERT INTO " . self::$jobsTable . " (handler, queue, run_at, created_at) VALUES(?, ?, ?, NOW())", @@ -495,6 +671,15 @@ public static function enqueue($handler, $queue = "default", $run_at = null) { return self::getConnection()->lastInsertId(); // return the job ID, for manipulation later } + /** + * Bulk enqueues a lot of jobs to the database. + * + * @param object[] $handlers An array of handlers to enqueue. + * @param string $queue The queue to enqueue the handlers to. + * @param string $run_at A valid mysql DATETIME string at which to run the jobs. + * + * @return bool + */ public static function bulkEnqueue($handlers, $queue = "default", $run_at = null) { $sql = "INSERT INTO " . self::$jobsTable . " (handler, queue, run_at, created_at) VALUES"; $sql .= implode(",", array_fill(0, count($handlers), "(?, ?, ?, NOW())")); @@ -518,6 +703,13 @@ public static function bulkEnqueue($handlers, $queue = "default", $run_at = null return true; } + /** + * Returns the general status of the jobs table. + * + * @param string $queue The queue of which to see the status for. + * + * @return array Information about the status. + */ public static function status($queue = "default") { $rs = self::runQuery(" SELECT COUNT(*) as total, COUNT(failed_at) as failed, COUNT(locked_at) as locked