Skip to content

Commit

Permalink
active record changes and fixing some small bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Ares committed Nov 9, 2008
1 parent 0be1cfa commit 4b9b079
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
37 changes: 17 additions & 20 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ class Job < ActiveRecord::Base
self.min_priority = nil
self.max_priority = nil

NextTaskSQL = '(`locked_by` = ?) OR (`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?))'
# Conditions to find tasks that are locked by this process or one that has
# been created before now and is not currently locked.
NextTaskSQL = '(locked_by = ?) OR (run_at <= ? AND (locked_at IS NULL OR locked_at < ?))'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

class LockError < StandardError
end

def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

def payload_object
Expand Down Expand Up @@ -65,31 +67,32 @@ def self.find_available(limit = 5)
time_now = db_time_now

sql = NextTaskSQL.dup
conditions = [time_now, time_now, worker_name]
conditions = [worker_name, time_now, time_now]

if self.min_priority
sql << ' AND (`priority` >= ?)'
sql << ' AND (priority >= ?)'
conditions << min_priority
end

if self.max_priority
sql << ' AND (`priority` <= ?)'
sql << ' AND (priority <= ?)'
conditions << max_priority
end

conditions.unshift(sql)

ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end

end


# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5).each do |job|
begin
Expand Down Expand Up @@ -124,22 +127,16 @@ def lock_exclusively!(max_run_time, worker = worker_name)
affected_rows = if locked_by != worker

# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
end_sql

transaction do
Job.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
end
else

# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
end_sql

transaction do
Job.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
end
end

unless affected_rows == 1
Expand All @@ -149,7 +146,7 @@ def lock_exclusively!(max_run_time, worker = worker_name)
self.locked_at = now
self.locked_by = worker
end

def unlock
self.locked_at = nil
self.locked_by = nil
Expand Down
16 changes: 16 additions & 0 deletions spec/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,20 @@ def perform; raise 'did not work'; end

end

context "when retreiving jobs" do
before(:each) do
@simple_job = SimpleJob.new
@job = Delayed::Job.create :payload_object => @simple_job, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
end

it "should return jobs that haven't been processed yet" do
SimpleJob.runs.should == 0
# Delayed::Job.should_receive(:find_available).once.with(5).and_return([@job])
Delayed::Job.should_receive(:reserve).once.and_yield(@job.payload_object)
Delayed::Job.work_off(1)
SimpleJob.runs.should == 1
end

end

end

0 comments on commit 4b9b079

Please sign in to comment.