diff --git a/lib/good_job/active_job_extensions/concurrency.rb b/lib/good_job/active_job_extensions/concurrency.rb index 3cfb03dc2..6fe329393 100644 --- a/lib/good_job/active_job_extensions/concurrency.rb +++ b/lib/good_job/active_job_extensions/concurrency.rb @@ -30,7 +30,7 @@ def backtrace key = job.good_job_concurrency_key next(block.call) if key.blank? - GoodJob::Execution.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do + GoodJob::Execution.with_advisory_lock(key: key, function: "pg_advisory_lock") do enqueue_concurrency = if enqueue_limit GoodJob::Execution.where(concurrency_key: key).unfinished.advisory_unlocked.count else @@ -61,7 +61,7 @@ def backtrace key = job.good_job_concurrency_key next if key.blank? - GoodJob::Execution.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do + GoodJob::Execution.with_advisory_lock(key: key, function: "pg_advisory_lock") do allowed_active_job_ids = GoodJob::Execution.where(concurrency_key: key).advisory_locked.order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC")).limit(perform_limit).pluck(:active_job_id) # The current job has already been locked and will appear in the previous query raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include? job.job_id diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index 172285278..e2262ffe4 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -144,15 +144,22 @@ module Lockable # MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record| # do_something_with record # end - def with_advisory_lock(column: _advisory_lockable_column, function: advisory_lockable_function, unlock_session: false) + def with_advisory_lock(key: nil, column: _advisory_lockable_column, function: advisory_lockable_function, unlock_session: false) raise ArgumentError, "Must provide a block" unless block_given? - records = advisory_lock(column: column, function: function).to_a + records = if key + advisory_record_lock!(key: key, function: function) + else + advisory_lock(column: column, function: function).to_a + end + begin unscoped { yield(records) } ensure if unlock_session advisory_unlock_session + elsif key + advisory_record_unlock(key: key, function: advisory_unlockable_function(function)) unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError else records.each do |record| record.advisory_unlock(key: record.lockable_column_key(column: column), function: advisory_unlockable_function(function)) @@ -161,6 +168,57 @@ def with_advisory_lock(column: _advisory_lockable_column, function: advisory_loc end end + # Acquires an advisory lock on this record or raises + # {RecordAlreadyAdvisoryLockedError} if it is already locked by another + # database session. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use + # @raise [RecordAlreadyAdvisoryLockedError] + # @return [Boolean] +true+ + def advisory_record_lock!(key:, function: advisory_lockable_function) + result = advisory_record_lock(key: key, function: function) + result || raise(RecordAlreadyAdvisoryLockedError) + end + + # Acquires an advisory lock on this record if it is not already locked by + # another database session. Be careful to ensure you release the lock when + # you are done with {#advisory_record_unlock} to release all remaining locks. + # @param key [String, Symbol] Key to Advisory Lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use + # @return [Boolean] whether the lock was acquired. + def advisory_record_lock(key:, function: advisory_lockable_function) + query = if function.include? "_try_" + <<~SQL.squish + SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked + SQL + else + <<~SQL.squish + SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked + SQL + end + + binds = [ + ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), + ] + connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).first['locked'] + end + + # Releases an advisory lock on this record if it is locked by this database + # session. Note that advisory locks stack, so you must call + # {#advisory_unlock} and {#advisory_lock} the same number of times. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use + # @return [Boolean] whether the lock was released. + def advisory_record_unlock(key:, function: advisory_unlockable_function) + query = <<~SQL.squish + SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked + SQL + binds = [ + ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), + ] + connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).first['unlocked'] + end + def _advisory_lockable_column advisory_lockable_column || primary_key end @@ -205,20 +263,7 @@ def pg_or_jdbc_query(query) # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was acquired. def advisory_lock(key: lockable_key, function: advisory_lockable_function) - query = if function.include? "_try_" - <<~SQL.squish - SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked - SQL - else - <<~SQL.squish - SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked - SQL - end - - binds = [ - ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), - ] - self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).first['locked'] + self.class.advisory_record_lock(key: key, function: function) end # Releases an advisory lock on this record if it is locked by this database @@ -228,13 +273,7 @@ def advisory_lock(key: lockable_key, function: advisory_lockable_function) # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was released. def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) - query = <<~SQL.squish - SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked - SQL - binds = [ - ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), - ] - self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).first['unlocked'] + self.class.advisory_record_unlock(key: key, function: function) end # Acquires an advisory lock on this record or raises @@ -245,8 +284,7 @@ def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_ # @raise [RecordAlreadyAdvisoryLockedError] # @return [Boolean] +true+ def advisory_lock!(key: lockable_key, function: advisory_lockable_function) - result = advisory_lock(key: key, function: function) - result || raise(RecordAlreadyAdvisoryLockedError) + self.class.advisory_record_lock!(key: key, function: function) end # Acquires an advisory lock on this record and safely releases it after the diff --git a/spec/lib/good_job/lockable_spec.rb b/spec/lib/good_job/lockable_spec.rb index ecae097fc..47a81b9e1 100644 --- a/spec/lib/good_job/lockable_spec.rb +++ b/spec/lib/good_job/lockable_spec.rb @@ -120,6 +120,19 @@ expect(sql).to eq 'SELECT "good_jobs".* FROM "good_jobs"' end + + context 'when `key` option passed' do + it 'locks exactly one record by `key`' do + model_class.with_advisory_lock(key: execution.lockable_key) do + expect(execution.advisory_locked?).to be true + expect(execution.owns_advisory_lock?).to be true + expect(PgLock.advisory_lock.count).to eq 1 + end + + expect(execution.advisory_locked?).to be false + expect(execution.owns_advisory_lock?).to be false + end + end end describe '#advisory_lock' do