Skip to content

Commit

Permalink
Add row-level lock concern
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Jan 26, 2025
1 parent b6cbd53 commit ab475ba
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 0 deletions.
61 changes: 61 additions & 0 deletions app/models/concerns/good_job/row_lockable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

module GoodJob
#
# Adds Postgres row-locking (FOR UPDATE SKIP LOCKED) capabilities to an ActiveRecord record.
#
module RowLockable
extend ActiveSupport::Concern

included do
scope :row_lock, (lambda do |locked_by_id:, locked_at: Time.current|
original_query = self

jobs_table = arel_table
bind_locked_by_id = ActiveRecord::Relation::QueryAttribute.new("lock_id", locked_by_id, ActiveRecord::Type::String.new)
bind_locked_at = ActiveRecord::Relation::QueryAttribute.new("current_time", locked_at, ActiveRecord::Type::DateTime.new)

subquery = original_query.select(:id).arel
subquery.lock(Arel.sql("FOR NO KEY UPDATE SKIP LOCKED"))

# Get the binds from the original_query using to_sql_and_binds
_sql, original_query_binds, _preparable = connection.send(:to_sql_and_binds, original_query.arel)

# Build the update manager
update_manager = Arel::UpdateManager.new
update_manager.table(jobs_table)
update_manager.set([
[jobs_table[:locked_at], Arel::Nodes::BindParam.new(bind_locked_at)],
[jobs_table[:locked_by_id], Arel::Nodes::BindParam.new(bind_locked_by_id)],
])
update_manager.where(jobs_table[:id].in(subquery))
update_manager.take(1)

update_node = Arel::Nodes::UpdateStatement.new
update_node.relation = update_manager.ast.relation
update_node.values = update_manager.ast.values
update_node.wheres = update_manager.ast.wheres

results = connection.exec_query(
Arel.sql("#{update_node.to_sql} RETURNING *"),
"Lock Next Job",
[bind_locked_at, bind_locked_by_id] + original_query_binds
)

results.map { |result| instantiate(result.stringify_keys) }
end)

scope :row_locked, -> { where.not(locked_by_id: nil) }
scope :row_unlocked, -> { where(locked_by_id: nil) }
end

def row_locked?
locked_by_id.present?
end

def row_unlock
update!(locked_by_id: nil, locked_at: nil)
end
end
end

4 changes: 4 additions & 0 deletions app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module GoodJob
# Active Record model that represents an +ActiveJob+ job.
class Job < BaseRecord
include AdvisoryLockable
include RowLockable
include ErrorEvents
include Filterable
include Reportable
Expand Down Expand Up @@ -190,6 +191,9 @@ class Job < BaseRecord
end
end)

# Find jobs that don't have a matching Process record
scope :illocked, -> { left_joins(:locked_by_process).where.not(locked_by_id: nil).where(locked_by_process: { id: nil }) }

class << self
# Parse a string representing a group of queues into a more readable data
# structure.
Expand Down
77 changes: 77 additions & 0 deletions spec/app/models/concerns/good_job/row_lockable_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe GoodJob::AdvisoryLockable do
before do
stub_const "TestRecord", (Class.new(GoodJob::BaseRecord) do
include GoodJob::RowLockable
include GoodJob::AdvisoryLockable

self.table_name = "good_jobs"
end)
end

let(:model_class) { TestRecord }
let!(:job) { model_class.create!(active_job_id: SecureRandom.uuid, queue_name: "default") }
let!(:another_job) { model_class.create!(active_job_id: SecureRandom.uuid, queue_name: "default") }

around do |example|
RSpec.configure do |config|
config.expect_with :rspec do |c|
original_max_formatted_output_length = c.instance_variable_get(:@max_formatted_output_length)

c.max_formatted_output_length = 1000000
example.run

c.max_formatted_output_length = original_max_formatted_output_length
end
end
end

describe '.row_lock' do
it 'returns the locked record' do
locked_by_id = SecureRandom.uuid

locked_job = model_class.where(id: job.id).row_lock(locked_by_id: locked_by_id).first
expect(locked_job).to eq(job)
expect(locked_job.locked_by_id).to eq(locked_by_id)
expect(locked_job.locked_at).to be_present
end

it 'returns nil if no records are locked' do
locked_job = model_class.where(id: nil).row_lock(locked_by_id: SecureRandom.uuid)
expect(locked_job).to be_empty
end

it "respects the limit" do
locked_job = model_class.limit(2).row_lock(locked_by_id: SecureRandom.uuid)
expect(locked_job.to_a).to contain_exactly(job, another_job)
end
end

it "generates the appropriate SQL" do
connection = model_class.connection
allow(connection).to receive(:exec_query).and_call_original
allow(model_class).to receive(:connection).and_return(connection)

locked_by_id = SecureRandom.uuid

model_class.where(id: job.id).order(id: :asc).row_lock(locked_by_id: locked_by_id)

expect(connection).to have_received(:exec_query) do |sql, name, binds|
expect(normalize_sql(sql)).to eq normalize_sql(<<~SQL.squish)
UPDATE "good_jobs"
SET "locked_at" = $1, "locked_by_id" = $2
WHERE "good_jobs"."id" IN (
SELECT "good_jobs"."id"
FROM "good_jobs"
WHERE "good_jobs"."id" = $3
ORDER BY "good_jobs"."id" ASC
FOR NO KEY UPDATE SKIP LOCKED
)
RETURNING *
SQL
end
end
end

0 comments on commit ab475ba

Please sign in to comment.