Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom advisory locks to prevent certain jobs from being worked on concurrently? #206

Closed
reczy opened this issue Jan 24, 2021 · 18 comments · Fixed by #281
Closed

Custom advisory locks to prevent certain jobs from being worked on concurrently? #206

reczy opened this issue Jan 24, 2021 · 18 comments · Fixed by #281

Comments

@reczy
Copy link
Contributor

reczy commented Jan 24, 2021

Hi Ben,

First of all, thanks so much for your hard work on this gem. I've been following the project since it showed up on HN a few months ago and recently started porting over my background jobs from Resque.

I'm submitting this issue to see if you'd be open to custom advisory locks? In my case, the purpose would be to prevent certain jobs (across multiple job classes) that work on data for a particular tenant from being worked on concurrently. In other words - perform only one job at a time for a particular account/organization, based on the job arguments.

As a workaround for now, I am grabbing a second advisory lock in around_perform that's based on a particular job argument, but this approach is obviously inefficient and fails to take advantage of your nice code already in Lockable.

Currently, you use part of the md5 hash of (1) the good_job table name and (2) the job primary key to determine the advisory lock for a particular job. Ideally, I'd have the opportunity to specify the text your Lockable module feeds to md5. In my opinion, doing it this way (rather than allowing the user to just specify the final 64-bit integer or 2 32-bit integers) seems like a decent trade-off between control and complexity. I suppose that just being able to specify (2) would be fine as well since the table_name doesn't change.

I've tried to think through the easiest way of accomplishing this. One option is to allow a job method that would return a string to be fed to md5 (if not present, then fallback to the current way of doing things).

PSUEDO CODE:

# jobs/specific_job.rb
class SpecificJob < ApplicationJob
  
  # args would be job arguments
  self.good_job_lock_text(*args)
    "TENANT_LOCK_#{args.first}"
  end

  # ...
end

Then in GoodJob::Lockable

def hashable_lock_text
  <user defined text from good_job_lock_text> or <table_name + primary_key>
end

and elsewhere in Lockable
md5(#{hashable_lock_text})


So anyway, I'd love to hear your thoughts on whether this is something you'd consider for this gem. I have no attachment to the specific design suggestion above - just trying to spark ideas. It would be great to be able to rely on the code already in Lockable to limit job concurrency (and avoid duplicating db locking).

Thanks!

@bensheldon
Copy link
Owner

@reczy thanks for proposing this idea! I think it's possible to extend GoodJob / Lockable, but with some tweaks.

I think it would be fairly simple to have Lockable have a configurable SQL lock string that gets MD5ed. It would need to be done entirely in SQL though, meaning it would probably end up looking like md5(COALESCE(custom_column, id))....

The trickiest part would be figuring out the scope :advisory_lock which has to rewrite the query to fit within a CTE: https://github.com/bensheldon/good_job/blob/main/lib/good_job/lockable.rb#L44-L46

I think that's possible.

The caveat is that the GoodJob::Job / Lockable locking code has no awareness of ActiveJob or different job classes. The lock string would need to be global rather than on a per-job basis, which probably makes it even more complicated SQL.

If you wanted to try extending GoodJob::Lockable, I'd definitely consider the change. But it's low priority otherwise. I'd initially suggest what I think you're already doing: try locking the secondary record and then retry the job if it's already locked elsewhere.

@reczy
Copy link
Contributor Author

reczy commented Jan 25, 2021

Thanks for the consideration and additional suggestions! I think starting with the advisory_lock scope you linked to is helpful context and helps narrow the focus a bit here.

For my use case, using a global lock string would introduce too much of a potential bottleneck. However, I still think it's possible to set the lock string on a per-job basis if we do it at job creation.

What about something like:

SpecificJob.perform_later({ tenant_id: 999 })

# jobs/specific_job.rb
class SpecificJob < ApplicationJob
  
  # ...

  def self.good_job_lock_key(*args)
    "TENANT_LOCK_#{args.first[:tenant_id]}"
  end
  
  # ...

end

Then in GoodJob::Job

    def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
      good_job = nil
      ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
        good_job = GoodJob::Job.new(
          queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
          priority: active_job.priority || DEFAULT_PRIORITY,
          serialized_params: active_job.serialize,
          scheduled_at: scheduled_at,
          create_with_advisory_lock: create_with_advisory_lock
        )
        
        # NEW STUFF -->

        job_klass = good_job.serialized_params['job_class'].constantize

        if job_klass.respond_to?(:good_job_lock_key)

          # If we want to save this value in an optional column
          if good_job.has_attribute?(:lock_key)
            good_job.lock_key = job_klass.good_job_lock_key(*active_job.arguments)
          end

          # Alternatively, if we'd rather just save it in the existing jsonb column
          good_job.serialized_params['good_job_lock_key'] = job_klass.good_job_lock_key(*active_job.arguments)
        end

        # <-- NEW STUFF

        instrument_payload[:good_job] = good_job

        good_job.save!
        active_job.provider_job_id = good_job.id
      end

      good_job
    end

Then, of course, we could use your suggestion of md5(COALESCE(custom_column, id))... or similar to grab the proper value.

I think this could work, but that still leaves the decision of whether store the lock in its own column or in the serialized_params jsonb column. Do you have a preference? I think the former would be simpler at the expense of a little additional setup by the user (though only for the subset of users that wants to control the locks). Going with the latter would probably entail slightly more complex sql and means we're adding metadata to the serialized_params column that's beyond the serialized active_job.

I recognize this is not a priority for you but sincerely appreciate your thoughts to help mitigate any dead-ends and stay in line with your future vision of good_job!

Thanks!
Mike

@bensheldon
Copy link
Owner

This code looks like what I was thinking, though with a twist: I would like GoodJob to be able to be extended to achieve this functionality, but I do not want to build a separate lock column as core functionality because GoodJob (the core of it) is focused on ActiveJob functionality.

I'm imagining that, if the GoodJob was extendable to allow this functionality, the implementation in your app would look something like this:

# config/initializers/good_job

# It looks like there isn't a good notification that intercepts the not-yet-saved GoodJob::Job record, 
# but that's easily added into the Job.enqueue method
ActiveSupport::Notifications.subscribe "before_enqueue_job.good_job" do |event|
  good_job, active_job = event.payload.values_at(:good_job, :active_job)
  good_job.lock_key = active_job.lock_key if active_job.respond_to?(:lock_key)
  # You would have made your own migration to extend the `good_jobs` table with the lock_key column.
end

GoodJob::Job.lockable_function_sql = "COALESCE(lock_key, id)" # <= I think this is the hard part to make work.

@reczy
Copy link
Contributor Author

reczy commented Jan 26, 2021

Oh, I see what you're saying. I'll have to play around with this a bit to see what's possible here.

Thanks, Ben!

@morgoth
Copy link
Collaborator

morgoth commented Jan 31, 2021

Hey @reczy I think you might be interested in this PR rails/rails#40337 that might end up in the Rails itself.

@bensheldon
Copy link
Owner

I have reconsidered my objections, described in #255, and now believe that this would be a good capability to integrate into GoodJob regardless of whether ActiveJob defines a core interface for it. I've added this Issue to the prioritized backlog. Let's do this 🚀

@reczy
Copy link
Contributor Author

reczy commented May 14, 2021

@bensheldon Amazing news!

From #255

I will re-open a representative issue for further discussion and design of those capabilities.

Sounds good!

@reczy
Copy link
Contributor Author

reczy commented May 16, 2021

@bensheldon I'm going to be a bit short on time and unable to meaningfully contribute over the next couple weeks, but I wanted to at least start out here by throwing out some code that seems to be working for me (There very well may be some errors here and it's 100% not production ready, but hopefully it can help prevent some duplicative efforts - sorry it's not a real PR. This is more thinking out loud with examples.

I initially started thinking about including 1 bigint or 2 integer columns (lock_classid, lock_objid) to track the advisory locks, but the signed 64 bit / signed 32 bit to unsigned 32 bit oid conversions are not the most straightforward. So, I ended up with a lock_key text column:

    create_table :good_jobs, id: :uuid do |t|
      t.text :lock_key, default: -> { "substr(md5(random()::text), 1, 16)" }, null: false
      ...
    end

    execute <<-SQL
      ALTER TABLE "good_jobs"
      ADD CONSTRAINT "good_jobs_lock_key_constraint"
      CHECK ( length(lock_key) = 16 AND lock_key ~ '^[0-9a-f]+$' );
    SQL

I think developers would want to avoid having running jobs during this migration.

#lockable.rb

scope :advisory_lock, (lambda do
        original_query = self

        cte_table = Arel::Table.new(:rows)
        
        # ***** Add :lock_key to select ******
        cte_query = original_query.select([:id, :lock_key]).except(:limit)

        cte_type = if supports_cte_materialization_specifiers?
                     'MATERIALIZED'
                   else
                     ''
                   end

        composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' ')))

        # ***** Change lock calculation ******
        query = cte_table.project(cte_table[:id])
                         .with(composed_cte)
                         .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(:lock_key)})::bit(64)::bigint)"])))

        limit = original_query.arel.ast.limit
        query.limit = limit.value if limit.present?

        unscoped.where(arel_table[primary_key].in(query)).merge(original_query.only(:order))


end)

The only changes here are (1) the calculation of lock in query and (2) the fact that we now include two columns in "rows" (id and lock_key) in cte_query. Do you think (2) is the right choice? I would guess that it's quicker to find a job by the primary_key uuid than a 16 char text, and I think this might mean we don't need an index on lock_key. I think the outer WHERE of the scope would need to change if you wanted to have rows consist of only lock_key to avoid pulling from finished/future jobs (since lock_key is not unique). The downside to keeping both is that we approx double the size of "rows".

Other changes to Lockable would mostly revolve around the change in calculation of the lock - for example:

    def advisory_lock
      query = <<~SQL.squish
        SELECT 1 AS one
        WHERE pg_try_advisory_lock(('x' || $1)::bit(64)::bigint)
      SQL
      binds = [[nil, send(:lock_key)]]
      self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).any?
    end

   ...

      def advisory_locked?
      query = <<~SQL.squish
        SELECT 1 AS one
        FROM pg_locks
        WHERE pg_locks.locktype = 'advisory'
          AND pg_locks.objsubid = 1
          AND pg_locks.classid = ('x' || substr($1, 1, 8))::bit(32)::int
          AND pg_locks.objid = ('x' || substr($2, 9, 8))::bit(32)::int
      SQL
      binds = [[nil, send(:lock_key)], [nil, send(:lock_key)]]
      self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any?
    end

Notably missing here is the public API for users to control the lock_key. Not sure if you wanted it to look like the ActiveJob concurrency PR in rails or something else. For my own use case, the important thing is that locks can be controlled per job, per job class, and across multiple job classes (which I suppose just boils down to controllability on a per-job basis).

@bensheldon
Copy link
Owner

@reczy thank you for thinking through this proposal. One thing that sticks out (though maybe not necessary for the implementation) is that database migrations need to be supportable/representable in schema.rb; I do not want to require a change to structure.sql on end users.

It's maybe a bit of a diversion from where this Issue started, but I'd like to refocus on supporting the interface proposed in rails/rails#40337 that allows for a configurable level of concurrency, rather than just 1. To do that, I think would require making an additional query after a particular job has locked to count the concurrency, and then abort/retry if concurrency is exceeded. I think this scheme would also require an additional indexed concurrency_key column, though I'm imagining it would be a plaintext string that was up to the user to define (e.g. -> { |job| [job.class.name, job.some_model.id].join("") }

@reczy
Copy link
Contributor Author

reczy commented May 19, 2021

@reczy thank you for thinking through this proposal. One thing that sticks out (though maybe not necessary for the implementation) is that database migrations need to be supportable/representable in schema.rb; I do not want to require a change to structure.sql on end users.

Not sure exactly how recent this is, but the postgres checks / constraints are supported through schema.rb (is this is what you were referring to in the quote?). This is what the relevant part of my schema.rb looks like after the migration:

  create_table "good_jobs", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
    t.text "lock_key", default: -> { "substr(md5((random())::text), 1, 16)" }, null: false
    ...
    t.check_constraint "(length(lock_key) = 16) AND (lock_key ~ '^[0-9a-f]+$'::text)", name: "good_jobs_lock_key_constraint"
  end

It's maybe a bit of a diversion from where this Issue started, but I'd like to refocus on supporting the interface proposed in rails/rails#40337 that allows for a configurable level of concurrency, rather than just 1. To do that, I think would require making an additional query after a particular job has locked to count the concurrency, and then abort/retry if concurrency is exceeded. I think this scheme would also require an additional indexed concurrency_key column

Sounds good to me @bensheldon. Just so I understand, are you suggesting that there would be 2 columns then, say lock_key and concurrency_key? So when:

  • concurrency not set: GoodJob would set a random lock_key and leave concurrency_key unset. This would then run like GoodJob runs today (try to get the lock, if succeed run the job, then unlock)
  • concurrency = 1: GoodJob would set the lock_key according to user input (say by taking the first 16 characters of a hash of a string they provide) and leave concurrency_key unset; Process then same as above
  • concurrency > 1: GoodJob would set a random lock_key for the job (random so the initial advisory lock for multiple jobs having the same concurrency_key can succeed and so the very same job is not run twice at the same time), and then set the concurrency_key according to user input. GoodJob would use a secondary mechanism (not an advisory lock) to check concurrency (perhaps by using the running scope to find how many jobs sharing the same concurrency_key are running) and if < concurrency limit, run the job

Is that kinda what you were proposing?

though I'm imagining it would be a plaintext string that was up to the user to define (e.g. -> { |job| [job.class.name, job.some_model.id].join("") }

I think we are on close to the same page here. For the advisory lock(lock_key), this plaintext string would still have to be reduced to 2 32-bit unsigned oid columns (or one unsigned 64 bit number that could then be manually reduced to the 2 oid numbers for some of your queries). I was thinking let users define the plaintext string but have GoodJob reduce it to a 16 hex character string before saving it to postgres (rather than saving the plaintext in the db and reducing all the way to the 2 advisory lock oid's at runtime within queries). Saving lock_key instead as a bigint (using positive and negative numbers) would be even better for reducing the size of the temp rows table, but doing the conversions needed in some of the lockable.rbqueries wasn't immediately clear to me.

With respect to the concurrency_key, if advisory locks are not used, then I guess the format doesn't really matter (other than maybe nudging users not to use ridiculously long strings to help performance).

@bensheldon
Copy link
Owner

bensheldon commented May 20, 2021

It looks like check_constraint was introduced in Rails 6.1 😞

I am thinking of not trying to achieve concurrency via modifying the fetch-and-lock code and instead just query for how many instances are active/locked separately. e.g. GoodJob::Job.where(concurrency_key: job.concurrency_key).advisory_locked.count, and abort/retry if the number exceeds the desired value.

The benefit is that this approach is drastically simpler, covers all the concurrency values, and is likely robust enough (I really don't know about how problematic it is to join against pg_locks). The downside is that it's inefficient because it will load the job, and if not met, will have to re-enqueue the job with a delay (so both inefficient on the worker query, as well as additional latency from the retry).

@bensheldon
Copy link
Owner

I did some noodling on this and I think this is a bare-bones implementation:

# Example usage:
class MyJob < ApplicationJob
  include GoodJob::ActiveJobExtensions::Concurrency

  good_job_enqueue_exclusively limit: 2
  good_job_perform_exclusively limit: 1

  def good_job_concurrency_key
    [self.class.name, @some_arg].join("")
  end
end

# Example implementation:
# TODO: GoodJob::Adapter#enqueue will have to call job.good_job_concurrency_key 
#       and store the value in the database record
module GoodJob::ActiveJobExtensions::Concurrency
  extend ActiveSupport::Concern

  ConcurrencyExceededError = Class.new(StandardError)

  included do
    retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, attempts: Float::INFINITY, wait: :exponentially_longer
  end
  
  class_methods do
    def good_job_enqueue_exclusively(limit: 1)
      before_enqueue do |job|
        enqueue_concurrency = GoodJob::Job.where(concurrency_key: job.good_job_concurrency_key).unfinished.count
        
       if enqueue_concurrency + 1 > limit
          # does this work?
          throw :abort
        end
      end
    end

    def good_job_perform_exclusively(limit: 1)
      before_perform do |job|
        perform_concurrency = GoodJob::Job.where(concurrency_key: job.good_job_concurrency_key).advisory_locked.count
        # the current job has already been locked and will appear in the previous query
        if perform_concurrency > limit
          raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError
        end
      end
    end
  end
end

@bensheldon
Copy link
Owner

@reczy I have an implementation of this in #281. I'd love feedback on the interface. It's currently two class-level methods, and an instance method:

class MyJob < ApplicationJob
  include GoodJob::ActiveJobExtensions::Concurrency

  good_job_enqueue_exclusively_with(limit: 1)
  good_job_perform_exclusively_with(limit: 1)

  def perform(name)
    # do work
  end

  def good_job_concurrency_key
    "Unique-#{arguments.first}" # MyJob.perform_later("Alice")  => "Unique-Alice"
   end
end

@reczy
Copy link
Contributor Author

reczy commented Jun 24, 2021

@bensheldon Yeah, interface on this looks great! Having access to the job arguments was key in my own use-case, and this accomplishes that nicely. Offhand, I can't think of a scenario that good_job_concurrency_key can't account for.

I do have a question, though - what happens in the following scenario?

class MyJob < ApplicationJob
  include GoodJob::ActiveJobExtensions::Concurrency

  good_job_enqueue_exclusively_with(limit: 1)
  good_job_perform_exclusively_with(limit: 1)

  def perform(name)
    # do work
  end

  def good_job_concurrency_key
    "Unique-#{arguments.first}" # MyJob.perform_later("Alice")  => "Unique-Alice"
   end
end
class MyJob2 < ApplicationJob
  include GoodJob::ActiveJobExtensions::Concurrency

  good_job_enqueue_exclusively_with(limit: 5)
  good_job_perform_exclusively_with(limit: 5)

  def perform(name)
    # do work
  end

  def good_job_concurrency_key
    "Unique-#{arguments.first}" # MyJob.perform_later("Alice")  => "Unique-Alice"
   end
end

Note that the only differences above are the enqueue/perform limits (concurrency key is the same). This might occur when you are trying to lock jobs with a certain concurrency key across job classes (I happen to face this issue and need to keep certain job classes from being worked on at the same time for a given tenant).

Maybe this is a non-issue and you need only be concerned about the limits set for the job (or rather, its class) you're trying to add? Thoughts?

@bensheldon
Copy link
Owner

@reczy thanks for all the feedback!

what happens in the following scenario?

Great example! MyJob would only be enqueued/performed if there were zero enqueued/performing jobs of any class with the "Unique-Alice" key; and MyJob2 if there were less than 5 jobs of any class with that same concurrency key. It's weird because two different job classes can generate the same concurrency key, but contain different limits, or said a different way: keys are global but limits are per-job.

I think two suggestions are:

  • have a parent-job both jobs inherit from (though I think the PR at this moment would struggle with that due to how the behavior is included)
  • use a constant in the parent class like MY_JOBS_ENQUEUE_LIMIT and assign that to each of them.

That could imply a different interface. Maybe something like:

class MyJob < ApplicationJob
  self.good_job_enqueue_exclusively_with limit: 2, key: -> { "Unique-#{arguments.first}" } # job.instance_eval it
  self.good_job_perform_exclusively_with limit: 2, key: -> { "Unique-#{arguments.first}" } 

  #... I guess it wouldn't work if the keys were different because a job can only have one key
  # maybe set them both at the same time?
  self.good_job_exclusively_with enqueue_limit: 2, perform_limit: 2, key: -> { "Unique-#{arguments.first}" } 

  # that would make it more easily inheritable, and you could also do something like this in multiple classes...
  MY_JOBS_EXCLUSIVELY = { perform_limit: 3, enqueue_limit: 3, key: -> { "Unique-#{arguments.first}" } }
  self.good_job_exclusively_with **MY_JOBS_EXCLUSIVELY
end

But I dunno 🤷‍♂️ What do you think of that?

@reczy
Copy link
Contributor Author

reczy commented Jun 25, 2021

That makes sense.

I don't think either approach is better or worse in any meaningful way (for the developer experience, specifically) so I don't know that I personally have a strong opinion or recommendation on this particular aspect of design (though if you do go with an interface in your last response, I think the one with enqueue_limit and perform_limit together is cleaner).

If you went with the original design, I could see myself using dedicated lock concerns for multi-class locks to keep one source of truth.

Maybe just go with whatever one is cleanest to implement in GoodJob internally?

@bensheldon
Copy link
Owner

Thanks for that feedback! I think I'm going to go with the simplest option, and also the most clearly named option:

self.good_job_control_concurrency_with enqueue_limit: 2, perform_limit: 2, key: -> { "Unique-#{arguments.first}" } 

@bensheldon
Copy link
Owner

Concurrency controls have been released in GoodJob v1.11.0 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants