Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arbitrary lock on class level too #499

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/good_job/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def backtrace
key = job.good_job_concurrency_key
next(block.call) if key.blank?

GoodJob::Execution.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do
GoodJob::Execution.with_advisory_lock(key: key, function: "pg_advisory_lock") do
enqueue_concurrency = if enqueue_limit
GoodJob::Execution.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
Expand Down Expand Up @@ -61,7 +61,7 @@ def backtrace
key = job.good_job_concurrency_key
next if key.blank?

GoodJob::Execution.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do
GoodJob::Execution.with_advisory_lock(key: key, function: "pg_advisory_lock") do
allowed_active_job_ids = GoodJob::Execution.where(concurrency_key: key).advisory_locked.order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC")).limit(perform_limit).pluck(:active_job_id)
# The current job has already been locked and will appear in the previous query
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include? job.job_id
Expand Down
88 changes: 63 additions & 25 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,22 @@ module Lockable
# MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record|
# do_something_with record
# end
def with_advisory_lock(column: _advisory_lockable_column, function: advisory_lockable_function, unlock_session: false)
def with_advisory_lock(key: nil, column: _advisory_lockable_column, function: advisory_lockable_function, unlock_session: false)
raise ArgumentError, "Must provide a block" unless block_given?

records = advisory_lock(column: column, function: function).to_a
records = if key
advisory_record_lock!(key: key, function: function)
else
advisory_lock(column: column, function: function).to_a
end

begin
unscoped { yield(records) }
ensure
if unlock_session
advisory_unlock_session
elsif key
advisory_record_unlock(key: key, function: advisory_unlockable_function(function)) unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError
else
records.each do |record|
record.advisory_unlock(key: record.lockable_column_key(column: column), function: advisory_unlockable_function(function))
Expand All @@ -161,6 +168,57 @@ def with_advisory_lock(column: _advisory_lockable_column, function: advisory_loc
end
end

# Acquires an advisory lock on this record or raises
# {RecordAlreadyAdvisoryLockedError} if it is already locked by another
# database session.
# @param key [String, Symbol] Key to lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @raise [RecordAlreadyAdvisoryLockedError]
# @return [Boolean] +true+
def advisory_record_lock!(key:, function: advisory_lockable_function)
result = advisory_record_lock(key: key, function: function)
result || raise(RecordAlreadyAdvisoryLockedError)
end

# Acquires an advisory lock on this record if it is not already locked by
# another database session. Be careful to ensure you release the lock when
# you are done with {#advisory_record_unlock} to release all remaining locks.
# @param key [String, Symbol] Key to Advisory Lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was acquired.
def advisory_record_lock(key:, function: advisory_lockable_function)
query = if function.include? "_try_"
<<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked
SQL
else
<<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked
SQL
end

binds = [
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
]
connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).first['locked']
end

# Releases an advisory lock on this record if it is locked by this database
# session. Note that advisory locks stack, so you must call
# {#advisory_unlock} and {#advisory_lock} the same number of times.
# @param key [String, Symbol] Key to lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was released.
def advisory_record_unlock(key:, function: advisory_unlockable_function)
query = <<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked
SQL
binds = [
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
]
connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).first['unlocked']
end

def _advisory_lockable_column
advisory_lockable_column || primary_key
end
Expand Down Expand Up @@ -205,20 +263,7 @@ def pg_or_jdbc_query(query)
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was acquired.
def advisory_lock(key: lockable_key, function: advisory_lockable_function)
query = if function.include? "_try_"
<<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked
SQL
else
<<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked
SQL
end

binds = [
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).first['locked']
self.class.advisory_record_lock(key: key, function: function)
end

# Releases an advisory lock on this record if it is locked by this database
Expand All @@ -228,13 +273,7 @@ def advisory_lock(key: lockable_key, function: advisory_lockable_function)
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was released.
def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function))
query = <<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked
SQL
binds = [
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).first['unlocked']
self.class.advisory_record_unlock(key: key, function: function)
end

# Acquires an advisory lock on this record or raises
Expand All @@ -245,8 +284,7 @@ def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_
# @raise [RecordAlreadyAdvisoryLockedError]
# @return [Boolean] +true+
def advisory_lock!(key: lockable_key, function: advisory_lockable_function)
result = advisory_lock(key: key, function: function)
result || raise(RecordAlreadyAdvisoryLockedError)
self.class.advisory_record_lock!(key: key, function: function)
end

# Acquires an advisory lock on this record and safely releases it after the
Expand Down
13 changes: 13 additions & 0 deletions spec/lib/good_job/lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@

expect(sql).to eq 'SELECT "good_jobs".* FROM "good_jobs"'
end

context 'when `key` option passed' do
it 'locks exactly one record by `key`' do
model_class.with_advisory_lock(key: execution.lockable_key) do
expect(execution.advisory_locked?).to be true
expect(execution.owns_advisory_lock?).to be true
expect(PgLock.advisory_lock.count).to eq 1
end

expect(execution.advisory_locked?).to be false
expect(execution.owns_advisory_lock?).to be false
end
end
end

describe '#advisory_lock' do
Expand Down