diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index adb6b9c8..eef17fa5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -53,5 +53,3 @@ jobs: bin/rails db:setup - name: Run tests run: bin/rails test - - name: Run tests with separate connection - run: SEPARATE_CONNECTION=1 bin/rails test diff --git a/.gitignore b/.gitignore index b1dd6310..3cbf5eae 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ /tmp/ /test/dummy/db/*.sqlite3 /test/dummy/db/*.sqlite3-* -/test/dummy/log/*.log +/test/dummy/log/*.log* /test/dummy/tmp/ # Folder for JetBrains IDEs @@ -14,5 +14,8 @@ # Folder for Visual Studio Code /.vscode/ +# Files for RVM holdouts +.ruby-gemset + # misc .DS_Store diff --git a/.rubocop.yml b/.rubocop.yml index d395e1e9..75df1173 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -4,6 +4,6 @@ inherit_gem: { rubocop-rails-omakase: rubocop.yml } AllCops: - TargetRubyVersion: 3.0 + TargetRubyVersion: 3.3 Exclude: - - "test/dummy/db/schema.rb" + - "**/*_schema.rb" diff --git a/.ruby-version b/.ruby-version index bea438e9..fa7adc7a 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -3.3.1 +3.3.5 diff --git a/Gemfile.lock b/Gemfile.lock index 5aeb2b79..36c358be 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - solid_queue (1.0.0) + solid_queue (1.1.0) activejob (>= 7.1) activerecord (>= 7.1) concurrent-ruby (>= 1.3.1) @@ -12,9 +12,9 @@ PATH GEM remote: https://rubygems.org/ specs: - actionpack (7.1.3.4) - actionview (= 7.1.3.4) - activesupport (= 7.1.3.4) + actionpack (7.1.4.1) + actionview (= 7.1.4.1) + activesupport (= 7.1.4.1) nokogiri (>= 1.8.5) racc rack (>= 2.2.4) @@ -22,22 +22,22 @@ GEM rack-test (>= 0.6.3) rails-dom-testing (~> 2.2) rails-html-sanitizer (~> 1.6) - actionview (7.1.3.4) - activesupport (= 7.1.3.4) + actionview (7.1.4.1) + activesupport (= 7.1.4.1) builder (~> 3.1) erubi (~> 1.11) rails-dom-testing (~> 2.2) rails-html-sanitizer (~> 1.6) - activejob (7.1.3.4) - activesupport (= 7.1.3.4) + activejob (7.1.4.1) + activesupport (= 7.1.4.1) globalid (>= 0.3.6) - activemodel (7.1.3.4) - activesupport (= 7.1.3.4) - activerecord (7.1.3.4) - activemodel (= 7.1.3.4) - activesupport (= 7.1.3.4) + activemodel (7.1.4.1) + activesupport (= 7.1.4.1) + activerecord (7.1.4.1) + activemodel (= 7.1.4.1) + activesupport (= 7.1.4.1) timeout (>= 0.4.0) - activesupport (7.1.3.4) + activesupport (7.1.4.1) base64 bigdecimal concurrent-ruby (~> 1.0, >= 1.0.2) @@ -54,9 +54,10 @@ GEM concurrent-ruby (1.3.4) connection_pool (2.4.1) crass (1.0.6) - debug (1.7.1) - irb (>= 1.5.0) - reline (>= 0.3.1) + date (3.4.1) + debug (1.9.2) + irb (~> 1.10) + reline (>= 0.3.8) drb (2.2.1) erubi (1.13.0) et-orbi (1.2.11) @@ -66,46 +67,50 @@ GEM raabro (~> 1.4) globalid (1.2.1) activesupport (>= 6.1) - i18n (1.14.5) + i18n (1.14.6) concurrent-ruby (~> 1.0) - io-console (0.6.0) - irb (1.6.2) - reline (>= 0.3.0) - json (2.7.1) + io-console (0.7.2) + irb (1.14.1) + rdoc (>= 4.0.0) + reline (>= 0.4.2) + json (2.8.2) language_server-protocol (3.17.0.3) - loofah (2.22.0) + logger (1.6.2) + loofah (2.23.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) - mini_portile2 (2.8.1) - minitest (5.24.0) + mini_portile2 (2.8.8) + minitest (5.25.2) mocha (2.1.0) ruby2_keywords (>= 0.0.5) - mutex_m (0.2.0) - mysql2 (0.5.4) - nio4r (2.7.0) - nokogiri (1.16.6-arm64-darwin) + mutex_m (0.3.0) + mysql2 (0.5.6) + nio4r (2.7.4) + nokogiri (1.16.7-arm64-darwin) racc (~> 1.4) - nokogiri (1.16.6-x86_64-darwin) + nokogiri (1.16.7-x86_64-darwin) racc (~> 1.4) - nokogiri (1.16.6-x86_64-linux) + nokogiri (1.16.7-x86_64-linux) racc (~> 1.4) - parallel (1.24.0) - parser (3.3.0.5) + parallel (1.26.3) + parser (3.3.6.0) ast (~> 2.4.1) racc pg (1.5.4) - puma (6.4.2) + psych (5.2.1) + date + stringio + puma (6.4.3) nio4r (~> 2.0) raabro (1.4.0) - racc (1.8.0) - rack (3.1.5) + racc (1.8.1) + rack (3.1.8) rack-session (2.0.0) rack (>= 3.0.0) rack-test (2.1.0) rack (>= 1.3) - rackup (2.1.0) + rackup (2.2.1) rack (>= 3) - webrick (~> 1.8) rails-dom-testing (2.2.0) activesupport (>= 5.0.0) minitest @@ -113,44 +118,43 @@ GEM rails-html-sanitizer (1.6.0) loofah (~> 2.21) nokogiri (~> 1.14) - railties (7.1.3.4) - actionpack (= 7.1.3.4) - activesupport (= 7.1.3.4) + railties (7.1.4.1) + actionpack (= 7.1.4.1) + activesupport (= 7.1.4.1) irb rackup (>= 1.0.0) rake (>= 12.2) thor (~> 1.0, >= 1.2.2) zeitwerk (~> 2.6) rainbow (3.1.1) - rake (13.0.6) - regexp_parser (2.9.0) - reline (0.3.2) + rake (13.2.1) + rdoc (6.8.1) + psych (>= 4.0.0) + regexp_parser (2.9.2) + reline (0.5.12) io-console (~> 0.5) - rexml (3.3.6) - strscan - rubocop (1.62.1) + rubocop (1.69.0) json (~> 2.3) language_server-protocol (>= 3.17.0) parallel (~> 1.10) parser (>= 3.3.0.2) rainbow (>= 2.2.2, < 4.0) - regexp_parser (>= 1.8, < 3.0) - rexml (>= 3.2.5, < 4.0) - rubocop-ast (>= 1.31.1, < 2.0) + regexp_parser (>= 2.4, < 3.0) + rubocop-ast (>= 1.36.1, < 2.0) ruby-progressbar (~> 1.7) - unicode-display_width (>= 2.4.0, < 3.0) - rubocop-ast (1.31.2) - parser (>= 3.3.0.4) - rubocop-minitest (0.35.0) + unicode-display_width (>= 2.4.0, < 4.0) + rubocop-ast (1.36.2) + parser (>= 3.3.1.0) + rubocop-minitest (0.36.0) rubocop (>= 1.61, < 2.0) rubocop-ast (>= 1.31.1, < 2.0) - rubocop-performance (1.21.0) + rubocop-performance (1.23.0) rubocop (>= 1.48.1, < 2.0) rubocop-ast (>= 1.31.1, < 2.0) - rubocop-rails (2.24.1) + rubocop-rails (2.27.0) activesupport (>= 4.2.0) rack (>= 1.1) - rubocop (>= 1.33.0, < 2.0) + rubocop (>= 1.52.0, < 2.0) rubocop-ast (>= 1.31.1, < 2.0) rubocop-rails-omakase (1.0.0) rubocop @@ -161,28 +165,32 @@ GEM ruby2_keywords (0.0.5) sqlite3 (1.5.4) mini_portile2 (~> 2.8.0) - strscan (3.1.0) - thor (1.3.1) - timeout (0.4.1) + stringio (3.1.2) + thor (1.3.2) + timeout (0.4.2) tzinfo (2.0.6) concurrent-ruby (~> 1.0) - unicode-display_width (2.5.0) - webrick (1.8.1) - zeitwerk (2.6.12) + unicode-display_width (3.1.2) + unicode-emoji (~> 4.0, >= 4.0.4) + unicode-emoji (4.0.4) + zeitwerk (2.7.1) PLATFORMS arm64-darwin-22 arm64-darwin-23 + arm64-darwin-24 x86_64-darwin-21 x86_64-darwin-23 x86_64-linux DEPENDENCIES - debug + debug (~> 1.9) + logger mocha mysql2 pg puma + rdoc rubocop-rails-omakase solid_queue! sqlite3 diff --git a/README.md b/README.md index 38b2cf57..545c5f00 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,31 @@ Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails's multi-threading. +## Table of contents + +- [Installation](#installation) + - [Single database configuration](#single-database-configuration) + - [Incremental adoption](#incremental-adoption) + - [High performance requirements](#high-performance-requirements) +- [Configuration](#configuration) + - [Workers, dispatchers and scheduler](#workers-dispatchers-and-scheduler) + - [Queue order and priorities](#queue-order-and-priorities) + - [Queues specification and performance](#queues-specification-and-performance) + - [Threads, processes and signals](#threads-processes-and-signals) + - [Database configuration](#database-configuration) + - [Other configuration settings](#other-configuration-settings) +- [Lifecycle hooks](#lifecycle-hooks) +- [Errors when enqueuing](#errors-when-enqueuing) +- [Concurrency controls](#concurrency-controls) +- [Failed jobs and retries](#failed-jobs-and-retries) + - [Error reporting on jobs](#error-reporting-on-jobs) +- [Puma plugin](#puma-plugin) +- [Jobs and transactional integrity](#jobs-and-transactional-integrity) +- [Recurring tasks](#recurring-tasks) +- [Inspiration](#inspiration) +- [License](#license) + + ## Installation Solid Queue is configured by default in new Rails 8 applications. But if you're running an earlier version, you can add it manually following these steps: @@ -95,7 +120,7 @@ Running Solid Queue in a separate database is recommended, but it's also possibl You won't have multiple databases, so `database.yml` doesn't need to have primary and queue database. -## Incremental adoption +### Incremental adoption If you're planning to adopt Solid Queue incrementally by switching one job at the time, you can do so by leaving the `config.active_job.queue_adapter` set to your old backend, and then set the `queue_adapter` directly in the jobs you're moving: @@ -108,7 +133,7 @@ class MyJob < ApplicationJob end ``` -## High performance requirements +### High performance requirements Solid Queue was designed for the highest throughput when used with MySQL 8+ or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue. You can also use it with SQLite on smaller applications. @@ -180,7 +205,11 @@ Here's an overview of the different options: This will create a worker fetching jobs from all queues starting with `staging`. The wildcard `*` is only allowed on its own or at the end of a queue name; you can't specify queue names such as `*_some_queue`. These will be ignored. Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names. + + Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance). + - `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting. +It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat. - `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. @@ -195,6 +224,67 @@ This is useful when you run jobs with different importance or urgency in the sam We recommend not mixing queue order with priorities but either choosing one or the other, as that will make job execution order more straightforward for you. +### Queues specification and performance + +To keep polling performant and ensure a covering index is always used, Solid Queue only does two types of polling queries: +```sql +-- No filtering by queue +SELECT job_id +FROM solid_queue_ready_executions +ORDER BY priority ASC, job_id ASC +LIMIT ? +FOR UPDATE SKIP LOCKED; + +-- Filtering by a single queue +SELECT job_id +FROM solid_queue_ready_executions +WHERE queue_name = ? +ORDER BY priority ASC, job_id ASC +LIMIT ? +FOR UPDATE SKIP LOCKED; +``` + +The first one (no filtering by queue) is used when you specify +```yml +queues: * +``` +and there aren't any queues paused, as we want to target all queues. + +In other cases, we need to have a list of queues to filter by, in order, because we can only filter by a single queue at a time to ensure we use an index to sort. This means that if you specify your queues as: +```yml +queues: beta* +``` + +we'll need to get a list of all existing queues matching that prefix first, with a query that would look like this: +```sql +SELECT DISTINCT(queue_name) +FROM solid_queue_ready_executions +WHERE queue_name LIKE 'beta%'; +``` + +This type of `DISTINCT` query on a column that's the leftmost column in an index can be performed very fast in MySQL thanks to a technique called [Loose Index Scan](https://dev.mysql.com/doc/refman/8.0/en/group-by-optimization.html#loose-index-scan). PostgreSQL and SQLite, however, don't implement this technique, which means that if your `solid_queue_ready_executions` table is very big because your queues get very deep, this query will get slow. Normally your `solid_queue_ready_executions` table will be small, but it can happen. + +Similarly to using prefixes, the same will happen if you have paused queues, because we need to get a list of all queues with a query like +```sql +SELECT DISTINCT(queue_name) +FROM solid_queue_ready_executions +``` + +and then remove the paused ones. Pausing in general should be something rare, used in special circumstances, and for a short period of time. If you don't want to process jobs from a queue anymore, the best way to do that is to remove it from your list of queues. + +💡 To sum up, **if you want to ensure optimal performance on polling**, the best way to do that is to always specify exact names for them, and not have any queues paused. + +Do this: + +```yml +queues: background, backend +``` + +instead of this: +```yml +queues: back* +``` + ### Threads, processes and signals @@ -206,7 +296,9 @@ The supervisor is in charge of managing these processes, and it responds to the When receiving a `QUIT` signal, if workers still have jobs in-flight, these will be returned to the queue when the processes are deregistered. -If processes have no chance of cleaning up before exiting (e.g. if someone pulls a cable somewhere), in-flight jobs might remain claimed by the processes executing them. Processes send heartbeats, and the supervisor checks and prunes processes with expired heartbeats, which will release any claimed jobs back to their queues. You can configure both the frequency of heartbeats and the threshold to consider a process dead. See the section below for this. +If processes have no chance of cleaning up before exiting (e.g. if someone pulls a cable somewhere), in-flight jobs might remain claimed by the processes executing them. Processes send heartbeats, and the supervisor checks and prunes processes with expired heartbeats. Jobs that were claimed by processes with an expired heartbeat will be marked as failed with a `SolidQueue::Processes::ProcessPrunedError`. You can configure both the frequency of heartbeats and the threshold to consider a process dead. See the section below for this. + +In a similar way, if a worker is terminated in any other way not initiated by the above signals (e.g. a worker is sent a `KILL` signal), jobs in progress will be marked as failed so that they can be inspected, with a `SolidQueue::Processes::Process::ProcessExitError`. Sometimes a job in particular is responsible for this, for example, if it has a memory leak and you have a mechanism to kill processes over a certain memory threshold, so this will help identifying this kind of situation. ### Database configuration @@ -215,6 +307,32 @@ You can configure the database used by Solid Queue via the `config.solid_queue.c All the options available to Active Record for multiple databases can be used here. +### Other configuration settings + +_Note_: The settings in this section should be set in your `config/application.rb` or your environment config like this: `config.solid_queue.silence_polling = true` + +There are several settings that control how Solid Queue works that you can set as well: +- `logger`: the logger you want Solid Queue to use. Defaults to the app logger. +- `app_executor`: the [Rails executor](https://guides.rubyonrails.org/threading_and_code_execution.html#executor) used to wrap asynchronous operations, defaults to the app executor +- `on_thread_error`: custom lambda/Proc to call when there's an error within a Solid Queue thread that takes the exception raised as argument. Defaults to + + ```ruby + -> (exception) { Rails.error.report(exception, handled: false) } + ``` + + **This is not used for errors raised within a job execution**. Errors happening in jobs are handled by Active Job's `retry_on` or `discard_on`, and ultimately will result in [failed jobs](#failed-jobs-and-retries). This is for errors happening within Solid Queue itself. + +- `use_skip_locked`: whether to use `FOR UPDATE SKIP LOCKED` when performing locking reads. This will be automatically detected in the future, and for now, you'd only need to set this to `false` if your database doesn't support it. For MySQL, that'd be versions < 8, and for PostgreSQL, versions < 9.5. If you use SQLite, this has no effect, as writes are sequential. +- `process_heartbeat_interval`: the heartbeat interval that all processes will follow—defaults to 60 seconds. +- `process_alive_threshold`: how long to wait until a process is considered dead after its last heartbeat—defaults to 5 minutes. +- `shutdown_timeout`: time the supervisor will wait since it sent the `TERM` signal to its supervised processes before sending a `QUIT` version to them requesting immediate termination—defaults to 5 seconds. +- `silence_polling`: whether to silence Active Record logs emitted when polling for both workers and dispatchers—defaults to `true`. +- `supervisor_pidfile`: path to a pidfile that the supervisor will create when booting to prevent running more than one supervisor in the same host, or in case you want to use it for a health check. It's `nil` by default. +- `preserve_finished_jobs`: whether to keep finished jobs in the `solid_queue_jobs` table—defaults to `true`. +- `clear_finished_jobs_after`: period to keep finished jobs around, in case `preserve_finished_jobs` is true—defaults to 1 day. **Note:** Right now, there's no automatic cleanup of finished jobs. You'd need to do this by periodically invoking `SolidQueue::Job.clear_finished_in_batches`, but this will happen automatically in the near future. +- `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes. + + ## Lifecycle hooks In Solid queue, you can hook into two different points in the supervisor's life: @@ -242,30 +360,6 @@ SolidQueue.on_stop { stop_metrics_server } These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this. -### Other configuration settings - -_Note_: The settings in this section should be set in your `config/application.rb` or your environment config like this: `config.solid_queue.silence_polling = true` - -There are several settings that control how Solid Queue works that you can set as well: -- `logger`: the logger you want Solid Queue to use. Defaults to the app logger. -- `app_executor`: the [Rails executor](https://guides.rubyonrails.org/threading_and_code_execution.html#executor) used to wrap asynchronous operations, defaults to the app executor -- `on_thread_error`: custom lambda/Proc to call when there's an error within a Solid Queue thread that takes the exception raised as argument. Defaults to - - ```ruby - -> (exception) { Rails.error.report(exception, handled: false) } - ``` - - **This is not used for errors raised within a job execution**. Errors happening in jobs are handled by Active Job's `retry_on` or `discard_on`, and ultimately will result in [failed jobs](#failed-jobs-and-retries). This is for errors happening within Solid Queue itself. - -- `use_skip_locked`: whether to use `FOR UPDATE SKIP LOCKED` when performing locking reads. This will be automatically detected in the future, and for now, you'd only need to set this to `false` if your database doesn't support it. For MySQL, that'd be versions < 8, and for PostgreSQL, versions < 9.5. If you use SQLite, this has no effect, as writes are sequential. -- `process_heartbeat_interval`: the heartbeat interval that all processes will follow—defaults to 60 seconds. -- `process_alive_threshold`: how long to wait until a process is considered dead after its last heartbeat—defaults to 5 minutes. -- `shutdown_timeout`: time the supervisor will wait since it sent the `TERM` signal to its supervised processes before sending a `QUIT` version to them requesting immediate termination—defaults to 5 seconds. -- `silence_polling`: whether to silence Active Record logs emitted when polling for both workers and dispatchers—defaults to `true`. -- `supervisor_pidfile`: path to a pidfile that the supervisor will create when booting to prevent running more than one supervisor in the same host, or in case you want to use it for a health check. It's `nil` by default. -- `preserve_finished_jobs`: whether to keep finished jobs in the `solid_queue_jobs` table—defaults to `true`. -- `clear_finished_jobs_after`: period to keep finished jobs around, in case `preserve_finished_jobs` is true—defaults to 1 day. **Note:** Right now, there's no automatic cleanup of finished jobs. You'd need to do this by periodically invoking `SolidQueue::Job.clear_finished_in_batches`, but this will happen automatically in the near future. -- `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes. ## Errors when enqueuing @@ -377,15 +471,27 @@ plugin :solid_queue ``` to your `puma.rb` configuration. +If you're using Puma in development but you don't want to use Solid Queue in development, make sure you avoid the plugin being used, for example using an environment variable like this: +```ruby +plugin :solid_queue if ENV["SOLID_QUEUE_IN_PUMA"] +``` +that you set in production only. This is what Rails 8's default Puma config looks like. Otherwise, if you're using Puma in development but not Solid Queue, starting Pumna would start also Solid Queue supervisor and it'll most likely fail because it won't be properly configured. + ## Jobs and transactional integrity -:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and viceversa, and ensuring that your job won't be enqueued until the transaction within which you're enqueing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. +:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app. -Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app, **job enqueuing is deferred until any ongoing transaction is committed** thanks to Active Job's built-in capability to do this. This means that even if you run Solid Queue in the same DB as your app, you won't be taking advantage of this transactional integrity. +Starting from Rails 8, an option which doesn't rely on this transactional integrity and which Active Job provides is to defer the enqueueing of a job inside an Active Record transaction until that transaction successfully commits. This option can be set via the [`enqueue_after_transaction_commit`](https://edgeapi.rubyonrails.org/classes/ActiveJob/Enqueuing.html#method-c-enqueue_after_transaction_commit) class method on the job level and is by default disabled. Either it can be enabled for individual jobs or for all jobs through `ApplicationJob`: -If you prefer to change this, you can set [`config.active_job.enqueue_after_transaction_commit`](https://edgeguides.rubyonrails.org/configuring.html#config-active-job-enqueue-after-transaction-commit) to `never`. You can also set this on a per-job basis. +```ruby +class ApplicationJob < ActiveJob::Base + self.enqueue_after_transaction_commit = true +end +``` + +Using this option, you can also use Solid Queue in the same database as your app but not rely on transactional integrity. -If you set that to `never` but still want to make sure you're not inadvertently on transactional integrity, you can make sure that: +If you don't set this option but still want to make sure you're not inadvertently on transactional integrity, you can make sure that: - Your jobs relying on specific data are always enqueued on [`after_commit` callbacks](https://guides.rubyonrails.org/active_record_callbacks.html#after-commit-and-after-rollback) or otherwise from a place where you're certain that whatever data the job will use has been committed to the database before the job is enqueued. - Or, you configure a different database for Solid Queue, even if it's the same as your app, ensuring that a different connection on the thread handling requests or running jobs for your app will be used to enqueue jobs. For example: @@ -400,6 +506,7 @@ If you set that to `never` but still want to make sure you're not inadvertently config.solid_queue.connects_to = { database: { writing: :primary, reading: :replica } } ``` + ## Recurring tasks Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by the scheduler process and are defined in their own configuration file. By default, the file is located in `config/recurring.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_RECURRING_SCHEDULE` or by using the `--recurring_schedule_file` option with `bin/jobs`, like this: @@ -435,9 +542,15 @@ MyJob.perform_later(42, status: "custom_status") - `priority`: a numeric priority value to be used when enqueuing the job. - Tasks are enqueued at their corresponding times by the scheduler, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb). +For recurring tasks defined as a `command`, you can also change the job class that runs them as follows: +```ruby +Rails.application.config.after_initialize do # or to_prepare + SolidQueue::RecurringTask.default_job_class = MyRecurringCommandJob +end +``` + It's possible to run multiple schedulers with the same `recurring_tasks` configuration, for example, if you have multiple servers for redundancy, and you run the `scheduler` in more than one of them. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around. **Note**: a single recurring schedule is supported, so you can have multiple schedulers using the same schedule, but not multiple schedulers using different configurations. diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index d4abf45a..c2b13909 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -64,6 +64,7 @@ def perform finished else failed_with(result.error) + raise result.error end ensure job.unblock_next_blocked_job diff --git a/app/models/solid_queue/queue.rb b/app/models/solid_queue/queue.rb index 4cf05bfb..7968d395 100644 --- a/app/models/solid_queue/queue.rb +++ b/app/models/solid_queue/queue.rb @@ -40,6 +40,19 @@ def size @size ||= ReadyExecution.queued_as(name).count end + def latency + @latency ||= begin + now = Time.current + oldest_enqueued_at = ReadyExecution.queued_as(name).minimum(:created_at) || now + + (now - oldest_enqueued_at).to_i + end + end + + def human_latency + ActiveSupport::Duration.build(latency).inspect + end + def ==(queue) name == queue.name end diff --git a/app/models/solid_queue/queue_selector.rb b/app/models/solid_queue/queue_selector.rb index 77849056..24f6a6ad 100644 --- a/app/models/solid_queue/queue_selector.rb +++ b/app/models/solid_queue/queue_selector.rb @@ -34,7 +34,7 @@ def queue_names def eligible_queues if include_all_queues? then all_queues else - exact_names + prefixed_names + in_raw_order(exact_names + prefixed_names) end end @@ -42,8 +42,12 @@ def include_all_queues? "*".in? raw_queues end + def all_queues + relation.distinct(:queue_name).pluck(:queue_name) + end + def exact_names - raw_queues.select { |queue| !queue.include?("*") } + raw_queues.select { |queue| exact_name?(queue) } end def prefixed_names @@ -54,15 +58,41 @@ def prefixed_names end def prefixes - @prefixes ||= raw_queues.select { |queue| queue.ends_with?("*") }.map { |queue| queue.tr("*", "%") } + @prefixes ||= raw_queues.select { |queue| prefixed_name?(queue) }.map { |queue| queue.tr("*", "%") } end - def all_queues - relation.distinct(:queue_name).pluck(:queue_name) + def exact_name?(queue) + !queue.include?("*") + end + + def prefixed_name?(queue) + queue.ends_with?("*") end def paused_queues @paused_queues ||= Pause.all.pluck(:queue_name) end + + def in_raw_order(queues) + # Only need to sort if we have prefixes and more than one queue name. + # Exact names are selected in the same order as they're found + if queues.one? || prefixes.empty? + queues + else + queues = queues.dup + raw_queues.flat_map { |raw_queue| delete_in_order(raw_queue, queues) }.compact + end + end + + def delete_in_order(raw_queue, queues) + if exact_name?(raw_queue) + queues.delete(raw_queue) + elsif prefixed_name?(raw_queue) + prefix = raw_queue.tr("*", "") + queues.select { |queue| queue.start_with?(prefix) }.tap do |matches| + queues -= matches + end + end + end end end diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index d1016991..54777531 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -67,11 +67,15 @@ def enqueue(at:) end end - payload[:active_job_id] = active_job.job_id if active_job + active_job.tap do |enqueued_job| + payload[:active_job_id] = enqueued_job.job_id + end rescue RecurringExecution::AlreadyRecorded payload[:skipped] = true + false rescue Job::EnqueueError => error payload[:enqueue_error] = error.message + false end end diff --git a/bin/setup b/bin/setup index b3e163ab..5fbe3e57 100755 --- a/bin/setup +++ b/bin/setup @@ -2,11 +2,19 @@ set -eu cd "$(dirname "${BASH_SOURCE[0]}")" -docker-compose up -d --remove-orphans -docker-compose ps +if docker compose version &> /dev/null; then + DOCKER_COMPOSE_CMD="docker compose" +else + DOCKER_COMPOSE_CMD="docker-compose" +fi + +$DOCKER_COMPOSE_CMD up -d --remove-orphans +$DOCKER_COMPOSE_CMD ps bundle echo "Creating databases..." -rails db:reset +rails db:reset TARGET_DB=sqlite +rails db:reset TARGET_DB=mysql +rails db:reset TARGET_DB=postgres diff --git a/lib/generators/solid_queue/install/install_generator.rb b/lib/generators/solid_queue/install/install_generator.rb index 3d57391b..353b1094 100644 --- a/lib/generators/solid_queue/install/install_generator.rb +++ b/lib/generators/solid_queue/install/install_generator.rb @@ -11,9 +11,11 @@ def copy_files chmod "bin/jobs", 0755 & ~File.umask, verbose: false end - def configure_active_job_adapter - gsub_file Pathname(destination_root).join("config/environments/production.rb"), - /(# )?config\.active_job\.queue_adapter\s+=.*/, + def configure_adapter_and_database + pathname = Pathname(destination_root).join("config/environments/production.rb") + + gsub_file pathname, /\n\s*config\.solid_queue\.connects_to\s+=.*\n/, "\n", verbose: false + gsub_file pathname, /(# )?config\.active_job\.queue_adapter\s+=.*\n/, "config.active_job.queue_adapter = :solid_queue\n" + " config.solid_queue.connects_to = { database: { writing: :queue } }\n" end diff --git a/lib/solid_queue/app_executor.rb b/lib/solid_queue/app_executor.rb index da0976fe..0580213f 100644 --- a/lib/solid_queue/app_executor.rb +++ b/lib/solid_queue/app_executor.rb @@ -4,7 +4,7 @@ module SolidQueue module AppExecutor def wrap_in_app_executor(&block) if SolidQueue.app_executor - SolidQueue.app_executor.wrap(&block) + SolidQueue.app_executor.wrap(source: "application.solid_queue", &block) else yield end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 96e732c3..bd238dcc 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -2,6 +2,12 @@ module SolidQueue class Configuration + include ActiveModel::Model + + validate :ensure_configured_processes + validate :ensure_valid_recurring_tasks + validate :ensure_correctly_sized_thread_pool + class Process < Struct.new(:kind, :attributes) def instantiate "SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes) @@ -36,14 +42,46 @@ def configured_processes end end - def max_number_of_threads - # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task - workers_options.map { |options| options[:threads] }.max + 2 + def error_messages + if configured_processes.none? + "No workers or processed configured. Exiting..." + else + error_messages = invalid_tasks.map do |task| + all_messages = task.errors.full_messages.map { |msg| "\t#{msg}" }.join("\n") + "#{task.key}:\n#{all_messages}" + end + .join("\n") + + "Invalid processes configured:\n#{error_messages}" + end end private attr_reader :options + def ensure_configured_processes + unless configured_processes.any? + errors.add(:base, "No processes configured") + end + end + + def ensure_valid_recurring_tasks + unless skip_recurring_tasks? || invalid_tasks.none? + error_messages = invalid_tasks.map do |task| + "- #{task.key}: #{task.errors.full_messages.join(", ")}" + end + + errors.add(:base, "Invalid recurring tasks:\n#{error_messages.join("\n")}") + end + end + + def ensure_correctly_sized_thread_pool + if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads + errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " + + "database connection pool is #{db_pool_size}. Increase it in `config/database.yml`") + end + end + def default_options { config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH), @@ -54,6 +92,10 @@ def default_options } end + def invalid_tasks + recurring_tasks.select(&:invalid?) + end + def only_work? options[:only_work] end @@ -100,7 +142,7 @@ def dispatchers_options def recurring_tasks @recurring_tasks ||= recurring_tasks_config.map do |id, options| RecurringTask.from_configuration(id, **options) - end.select(&:valid?) + end end def processes_config @@ -147,5 +189,11 @@ def load_config_from_file(file) {} end end + + def estimated_number_of_threads + # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task + thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max + (thread_count || 1) + 2 + end end end diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index a22a82d8..fb988075 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -24,7 +24,8 @@ def metadata private def poll batch = dispatch_next_batch - batch.size + + batch.size.zero? ? polling_interval : 0.seconds end def dispatch_next_batch diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 3d2ec02c..96fb19bf 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -145,6 +145,7 @@ def unhandled_signal_error(event) end def replace_fork(event) + supervisor_pid = event.payload[:supervisor_pid] status = event.payload[:status] attributes = event.payload.slice(:pid).merge \ status: (status.exitstatus || "no exit status set"), @@ -155,7 +156,7 @@ def replace_fork(event) if replaced_fork = event.payload[:fork] info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname, name: replaced_fork.name)) - else + elsif supervisor_pid != 1 # Running Docker, possibly having some processes that have been reparented warn formatted_event(event, action: "Tried to replace forked process but it had already died", **attributes) end end diff --git a/lib/solid_queue/processes/interruptible.rb b/lib/solid_queue/processes/interruptible.rb index 67173aeb..3bff1dd9 100644 --- a/lib/solid_queue/processes/interruptible.rb +++ b/lib/solid_queue/processes/interruptible.rb @@ -7,31 +7,27 @@ def wake_up end private - SELF_PIPE_BLOCK_SIZE = 11 def interrupt - self_pipe[:writer].write_nonblock(".") - rescue Errno::EAGAIN, Errno::EINTR - # Ignore writes that would block and retry - # if another signal arrived while writing - retry + queue << true end + # Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up. + # @param time [Numeric] the time to sleep. 0 returns immediately. + # @return [true, nil] + # * returns `true` if an interrupt was requested via #wake_up between the + # last call to `interruptible_sleep` and now, resulting in an early return. + # * returns `nil` if it slept the full `time` and was not interrupted. def interruptible_sleep(time) - if time > 0 && self_pipe[:reader].wait_readable(time) - loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) } - end - rescue Errno::EAGAIN, Errno::EINTR + # Invoking this from the main thread may result in significant slowdown. + # Utilizing asynchronous execution (Futures) addresses this performance issue. + Concurrent::Promises.future(time) do |timeout| + queue.pop(timeout:).tap { queue.clear } + end.value end - # Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html) - def self_pipe - @self_pipe ||= create_self_pipe - end - - def create_self_pipe - reader, writer = IO.pipe - { reader: reader, writer: writer } + def queue + @queue ||= Queue.new end end end diff --git a/lib/solid_queue/processes/poller.rb b/lib/solid_queue/processes/poller.rb index bf5a7450..75df6104 100644 --- a/lib/solid_queue/processes/poller.rb +++ b/lib/solid_queue/processes/poller.rb @@ -25,11 +25,11 @@ def start_loop loop do break if shutting_down? - wrap_in_app_executor do - unless poll > 0 - interruptible_sleep(polling_interval) - end + delay = wrap_in_app_executor do + poll end + + interruptible_sleep(delay) end ensure SolidQueue.instrument(:shutdown_process, process: self) do diff --git a/lib/solid_queue/scheduler/recurring_schedule.rb b/lib/solid_queue/scheduler/recurring_schedule.rb index 5b8ff6bb..4070a0ec 100644 --- a/lib/solid_queue/scheduler/recurring_schedule.rb +++ b/lib/solid_queue/scheduler/recurring_schedule.rb @@ -41,6 +41,7 @@ def task_keys private def persist_tasks + SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all SolidQueue::RecurringTask.create_or_update_all configured_tasks end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 9ef736e4..e8f075eb 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -10,10 +10,10 @@ def start(**options) SolidQueue.supervisor = true configuration = Configuration.new(**options) - if configuration.configured_processes.any? + if configuration.valid? new(configuration).tap(&:start) else - abort "No workers or processed configured. Exiting..." + abort configuration.errors.full_messages.join("\n") + "\nExiting..." end end end diff --git a/lib/solid_queue/version.rb b/lib/solid_queue/version.rb index 27afdf20..557ba5c0 100644 --- a/lib/solid_queue/version.rb +++ b/lib/solid_queue/version.rb @@ -1,3 +1,3 @@ module SolidQueue - VERSION = "1.0.0" + VERSION = "1.1.0" end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index fc203774..f34a14f0 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -7,6 +7,7 @@ class Worker < Processes::Poller after_boot :run_start_hooks before_shutdown :run_stop_hooks + attr_accessor :queues, :pool def initialize(**options) @@ -29,7 +30,7 @@ def poll pool.post(execution) end - executions.size + pool.idle? ? polling_interval : 10.minutes end end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 7faeabfa..5a4b0de4 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -37,11 +37,13 @@ Gem::Specification.new do |spec| spec.add_dependency "fugit", "~> 1.11.0" spec.add_dependency "thor", "~> 1.3.1" - spec.add_development_dependency "debug" + spec.add_development_dependency "debug", "~> 1.9" spec.add_development_dependency "mocha" spec.add_development_dependency "puma" spec.add_development_dependency "mysql2" spec.add_development_dependency "pg" spec.add_development_dependency "sqlite3" spec.add_development_dependency "rubocop-rails-omakase" + spec.add_development_dependency "rdoc" + spec.add_development_dependency "logger" end diff --git a/test/dummy/app/models/sharded_job_result.rb b/test/dummy/app/models/sharded_job_result.rb new file mode 100644 index 00000000..001bb3b7 --- /dev/null +++ b/test/dummy/app/models/sharded_job_result.rb @@ -0,0 +1,2 @@ +class ShardedJobResult < ShardedRecord +end diff --git a/test/dummy/app/models/sharded_record.rb b/test/dummy/app/models/sharded_record.rb new file mode 100644 index 00000000..87d4bcaf --- /dev/null +++ b/test/dummy/app/models/sharded_record.rb @@ -0,0 +1,8 @@ +class ShardedRecord < ApplicationRecord + self.abstract_class = true + + connects_to shards: { + shard_one: { writing: :shard_one }, + shard_two: { writing: :shard_two } + } +end diff --git a/test/dummy/bin/jobs b/test/dummy/bin/jobs new file mode 100755 index 00000000..dcf59f30 --- /dev/null +++ b/test/dummy/bin/jobs @@ -0,0 +1,6 @@ +#!/usr/bin/env ruby + +require_relative "../config/environment" +require "solid_queue/cli" + +SolidQueue::Cli.start(ARGV) diff --git a/test/dummy/config/application.rb b/test/dummy/config/application.rb index 502f70cb..8fa76e8c 100644 --- a/test/dummy/config/application.rb +++ b/test/dummy/config/application.rb @@ -28,9 +28,5 @@ class Application < Rails::Application # config.eager_load_paths << Rails.root.join("extras") config.active_job.queue_adapter = :solid_queue - - if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite" - config.solid_queue.connects_to = { database: { writing: :primary, reading: :replica } } - end end end diff --git a/test/dummy/config/database.yml b/test/dummy/config/database.yml index 85f80b36..fdb186a5 100644 --- a/test/dummy/config/database.yml +++ b/test/dummy/config/database.yml @@ -10,7 +10,7 @@ <% if ENV["TARGET_DB"] == "sqlite" %> default: &default adapter: sqlite3 - pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 50 } %> + pool: 50 retries: 100 <% elsif ENV["TARGET_DB"] == "postgres" %> @@ -18,7 +18,7 @@ default: &default adapter: postgresql encoding: unicode username: postgres - pool: 5 + pool: 20 host: "127.0.0.1" port: 55432 gssencmode: disable # https://github.com/ged/ruby-pg/issues/311 @@ -27,7 +27,7 @@ default: &default default: &default adapter: mysql2 username: root - pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %> + pool: 20 host: "127.0.0.1" port: 33060 <% end %> @@ -35,19 +35,33 @@ default: &default development: primary: <<: *default - database: <%= database_name_from("solid_queue_development") %> - replica: + database: <%= database_name_from("development") %> + shard_one: <<: *default - database: <%= database_name_from("solid_queue_development") %> - replica: true + database: <%= database_name_from("development_shard_one") %> + migrations_paths: db/migrate_shards + shard_two: + <<: *default + database: <%= database_name_from("development_shard_two") %> + migrations_paths: db/migrate_shards + queue: + <<: *default + database: <%= database_name_from("development_queue") %> + migrations_paths: db/queue_migrate test: primary: <<: *default - pool: 20 - database: <%= database_name_from("solid_queue_test") %> - replica: + database: <%= database_name_from("test") %> + shard_one: + <<: *default + database: <%= database_name_from("test_shard_one") %> + migrations_paths: db/migrate_shards + shard_two: + <<: *default + database: <%= database_name_from("test_shard_two") %> + migrations_paths: db/migrate_shards + queue: <<: *default - pool: 20 - database: <%= database_name_from("solid_queue_test") %> - replica: true + database: <%= database_name_from("test_queue") %> + migrations_paths: db/queue_migrate diff --git a/test/dummy/config/environments/development.rb b/test/dummy/config/environments/development.rb index 4d9f1d46..da4baad4 100644 --- a/test/dummy/config/environments/development.rb +++ b/test/dummy/config/environments/development.rb @@ -51,6 +51,10 @@ # Raises error for missing translations. # config.i18n.raise_on_missing_translations = true + # Replace the default in-process and non-durable queuing backend for Active Job. + config.active_job.queue_adapter = :solid_queue + config.solid_queue.connects_to = { database: { writing: :queue } } + # Annotate rendered view with file names. # config.action_view.annotate_rendered_view_with_filenames = true diff --git a/test/dummy/config/environments/production.rb b/test/dummy/config/environments/production.rb index d08a6f9d..9323229a 100644 --- a/test/dummy/config/environments/production.rb +++ b/test/dummy/config/environments/production.rb @@ -44,8 +44,10 @@ # Use a different cache store in production. # config.cache_store = :mem_cache_store - # Use a real queuing backend for Active Job (and separate queues per environment). - # config.active_job.queue_adapter = :resque + # Replace the default in-process and non-durable queuing backend for Active Job. + config.active_job.queue_adapter = :solid_queue + config.solid_queue.connects_to = { database: { writing: :queue } } + # config.active_job.queue_name_prefix = "dummy_production" # Enable locale fallbacks for I18n (makes lookups for any locale fall back to diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index df832026..a5a99232 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -47,6 +47,10 @@ # Raises error for missing translations. # config.i18n.raise_on_missing_translations = true + # Replace the default in-process and non-durable queuing backend for Active Job. + config.active_job.queue_adapter = :solid_queue + config.solid_queue.connects_to = { database: { writing: :queue } } + # Annotate rendered view with file names. # config.action_view.annotate_rendered_view_with_filenames = true diff --git a/test/dummy/config/initializers/enable_yjit.rb b/test/dummy/config/initializers/enable_yjit.rb new file mode 100644 index 00000000..5367a5de --- /dev/null +++ b/test/dummy/config/initializers/enable_yjit.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +# Ideally, tests should be configured as close to production settings as +# possible and YJIT is likely to be enabled. While it's highly unlikely +# YJIT would cause issues, enabling it confirms this assertion. +# +# Configured via initializer to align with Rails 7.1 default in gemspec +if defined?(RubyVM::YJIT.enable) + Rails.application.config.after_initialize do + RubyVM::YJIT.enable + end +end diff --git a/test/dummy/config/recurring_with_invalid.yml b/test/dummy/config/recurring_with_invalid.yml new file mode 100644 index 00000000..69dacf6f --- /dev/null +++ b/test/dummy/config/recurring_with_invalid.yml @@ -0,0 +1,8 @@ +periodic_invalid_class: + class: StoreResultJorrrrrrb + queue: default + args: [42, { status: "custom_status" }] + schedule: every second +periodic_incorrect_schedule: + class: StoreResultJob + schedule: every 1.minute diff --git a/test/dummy/db/migrate_shards/20241205195735_create_sharded_job_results.rb b/test/dummy/db/migrate_shards/20241205195735_create_sharded_job_results.rb new file mode 100644 index 00000000..e2ded0c9 --- /dev/null +++ b/test/dummy/db/migrate_shards/20241205195735_create_sharded_job_results.rb @@ -0,0 +1,9 @@ +class CreateShardedJobResults < ActiveRecord::Migration[7.1] + def change + create_table :sharded_job_results do |t| + t.string :value + + t.timestamps + end + end +end diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb new file mode 100644 index 00000000..697c2e92 --- /dev/null +++ b/test/dummy/db/queue_schema.rb @@ -0,0 +1,141 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.1].define(version: 1) do + create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.bigint "process_id" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.text "error" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "queue_name", null: false + t.string "class_name", null: false + t.text "arguments" + t.integer "priority", default: 0, null: false + t.string "active_job_id" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" + t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" + t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" + t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" + t.integer "pid", null: false + t.string "hostname" + t.text "metadata" + t.datetime "created_at", null: false + t.string "name", null: false + t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" + t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true, null: false + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "scheduled_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade +end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index a52fb820..fd3db435 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -18,132 +18,4 @@ t.datetime "created_at", null: false t.datetime "updated_at", null: false end - - create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "queue_name", null: false - t.integer "priority", default: 0, null: false - t.string "concurrency_key", null: false - t.datetime "expires_at", null: false - t.datetime "created_at", null: false - t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" - t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" - t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true - end - - create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id", null: false - t.bigint "process_id" - t.datetime "created_at", null: false - t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true - t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" - end - - create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id", null: false - t.text "error" - t.datetime "created_at", null: false - t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true - end - - create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.string "queue_name", null: false - t.string "class_name", null: false - t.text "arguments" - t.integer "priority", default: 0, null: false - t.string "active_job_id" - t.datetime "scheduled_at" - t.datetime "finished_at" - t.string "concurrency_key" - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" - t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" - t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" - t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" - t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" - end - - create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.string "queue_name", null: false - t.datetime "created_at", null: false - t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true - end - - create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.string "kind", null: false - t.datetime "last_heartbeat_at", null: false - t.bigint "supervisor_id" - t.integer "pid", null: false - t.string "hostname" - t.text "metadata" - t.datetime "created_at", null: false - t.string "name", null: false - t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true - t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" - end - - create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "queue_name", null: false - t.integer "priority", default: 0, null: false - t.datetime "created_at", null: false - t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true - t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" - t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" - end - - create_table "solid_queue_recurring_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "task_key", null: false - t.datetime "run_at", null: false - t.datetime "created_at", null: false - t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true - t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true - end - - create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.string "key", null: false - t.string "schedule", null: false - t.string "command", limit: 2048 - t.string "class_name" - t.text "arguments" - t.string "queue_name" - t.integer "priority", default: 0 - t.boolean "static", default: true, null: false - t.text "description" - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true - t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" - end - - create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "queue_name", null: false - t.integer "priority", default: 0, null: false - t.datetime "scheduled_at", null: false - t.datetime "created_at", null: false - t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true - t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" - end - - create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.string "key", null: false - t.integer "value", default: 1, null: false - t.datetime "expires_at", null: false - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" - t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" - t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true - end - - add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade end diff --git a/test/dummy/db/shard_one_schema.rb b/test/dummy/db/shard_one_schema.rb new file mode 100644 index 00000000..2a6d3806 --- /dev/null +++ b/test/dummy/db/shard_one_schema.rb @@ -0,0 +1,20 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.1].define(version: 2024_12_05_195735) do + create_table "sharded_job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "value" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + +end diff --git a/test/dummy/db/shard_two_schema.rb b/test/dummy/db/shard_two_schema.rb new file mode 100644 index 00000000..2a6d3806 --- /dev/null +++ b/test/dummy/db/shard_two_schema.rb @@ -0,0 +1,20 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.1].define(version: 2024_12_05_195735) do + create_table "sharded_job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "value" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + +end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index e181e4ca..dbce706d 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -85,7 +85,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "run several jobs over the same record sequentially, with some of them failing" do ("A".."F").each_with_index do |name, i| # A, C, E will fail, for i= 0, 2, 4 - SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?)) + SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?)) end ("G".."K").each do |name| diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index c90d161a..59443ccf 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -162,7 +162,7 @@ class InstrumentationTest < ActiveSupport::TestCase test "errors when deregistering processes are included in deregister_process events" do previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - error = RuntimeError.new("everything is broken") + error = ExpectedTestError.new("everything is broken") SolidQueue::Process.any_instance.expects(:destroy!).raises(error).at_least_once events = subscribed("deregister_process.solid_queue") do @@ -182,7 +182,7 @@ class InstrumentationTest < ActiveSupport::TestCase end test "retrying failed job emits retry event" do - RaisingJob.perform_later(RuntimeError, "A") + RaisingJob.perform_later(ExpectedTestError, "A") job = SolidQueue::Job.last worker = SolidQueue::Worker.new.tap(&:start) @@ -198,7 +198,7 @@ class InstrumentationTest < ActiveSupport::TestCase end test "retrying failed jobs in bulk emits retry_all" do - 3.times { RaisingJob.perform_later(RuntimeError, "A") } + 3.times { RaisingJob.perform_later(ExpectedTestError, "A") } AddToBufferJob.perform_later("A") jobs = SolidQueue::Job.last(4) @@ -392,7 +392,7 @@ class InstrumentationTest < ActiveSupport::TestCase test "thread errors emit thread_error events" do previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - error = RuntimeError.new("everything is broken") + error = ExpectedTestError.new("everything is broken") SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once AddToBufferJob.perform_later "hey!" diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index e1b713ee..1740f760 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -4,11 +4,13 @@ class JobsLifecycleTest < ActiveSupport::TestCase setup do + SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ]) @worker = SolidQueue::Worker.new(queues: "background", threads: 3) @dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2) end teardown do + SolidQueue.on_thread_error = @on_thread_error @worker.stop @dispatcher.stop @@ -29,8 +31,8 @@ class JobsLifecycleTest < ActiveSupport::TestCase end test "enqueue and run jobs that fail without retries" do - RaisingJob.perform_later(RuntimeError, "A") - RaisingJob.perform_later(RuntimeError, "B") + RaisingJob.perform_later(ExpectedTestError, "A") + RaisingJob.perform_later(ExpectedTestError, "B") jobs = SolidQueue::Job.last(2) @dispatcher.start @@ -38,7 +40,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(3.seconds) - message = "raised RuntimeError for the 1st time" + message = "raised ExpectedTestError for the 1st time" assert_equal [ "A: #{message}", "B: #{message}" ], JobBuffer.values.sort assert_empty SolidQueue::Job.finished diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 5d5c2072..b96c452d 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -144,11 +144,11 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase test "process some jobs that raise errors" do 2.times { enqueue_store_result_job("no error", :background) } 2.times { enqueue_store_result_job("no error", :default) } - error1 = enqueue_store_result_job("error", :background, exception: RuntimeError) + error1 = enqueue_store_result_job("error", :background, exception: ExpectedTestError) enqueue_store_result_job("no error", :background, pause: 0.03) - error2 = enqueue_store_result_job("error", :background, exception: RuntimeError, pause: 0.05) + error2 = enqueue_store_result_job("error", :background, exception: ExpectedTestError, pause: 0.05) 2.times { enqueue_store_result_job("no error", :default, pause: 0.01) } - error3 = enqueue_store_result_job("error", :default, exception: RuntimeError) + error3 = enqueue_store_result_job("error", :default, exception: ExpectedTestError) wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ]) diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb index aa48c12a..7367bc06 100644 --- a/test/integration/recurring_tasks_test.rb +++ b/test/integration/recurring_tasks_test.rb @@ -64,13 +64,13 @@ class RecurringTasksTest < ActiveSupport::TestCase scheduler1 = SolidQueue::Scheduler.new(recurring_tasks: another_task).tap(&:start) wait_for_registered_processes(6, timeout: 1.second) - assert_recurring_tasks configured_task.merge(another_task) + assert_recurring_tasks another_task updated_task = { example_task: { class: "AddToBufferJob", schedule: "every minute" } } scheduler2 = SolidQueue::Scheduler.new(recurring_tasks: updated_task).tap(&:start) wait_for_registered_processes(7, timeout: 1.second) - assert_recurring_tasks configured_task.merge(updated_task) + assert_recurring_tasks updated_task terminate_process(@pid) scheduler1.stop diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 226dad77..4e99fd04 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -22,7 +22,9 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = claimed_execution.job assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - claimed_execution.perform + assert_raises RuntimeError do + claimed_execution.perform + end end assert_not job.reload.finished? @@ -37,10 +39,12 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase test "job failures are reported via Rails error subscriber" do subscriber = ErrorBuffer.new - with_error_subscriber(subscriber) do - claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") + assert_raises RuntimeError do + with_error_subscriber(subscriber) do + claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") - claimed_execution.perform + claimed_execution.perform + end end assert_equal 1, subscriber.errors.count diff --git a/test/models/solid_queue/failed_execution_test.rb b/test/models/solid_queue/failed_execution_test.rb index 7b142991..c2299b8a 100644 --- a/test/models/solid_queue/failed_execution_test.rb +++ b/test/models/solid_queue/failed_execution_test.rb @@ -7,7 +7,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase end test "run job that fails" do - RaisingJob.perform_later(RuntimeError, "A") + RaisingJob.perform_later(ExpectedTestError, "A") @worker.start assert_equal 1, SolidQueue::FailedExecution.count @@ -15,15 +15,17 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase end test "run job that fails with a SystemStackError (stack level too deep)" do - InfiniteRecursionJob.perform_later - @worker.start + silence_on_thread_error_for(SystemStackError) do + InfiniteRecursionJob.perform_later + @worker.start - assert_equal 1, SolidQueue::FailedExecution.count - assert SolidQueue::Job.last.failed? + assert_equal 1, SolidQueue::FailedExecution.count + assert SolidQueue::Job.last.failed? + end end test "retry failed job" do - RaisingJob.perform_later(RuntimeError, "A") + RaisingJob.perform_later(ExpectedTestError, "A") @worker.start assert_difference -> { SolidQueue::FailedExecution.count }, -1 do @@ -34,7 +36,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase end test "retry failed jobs in bulk" do - 1.upto(5) { |i| RaisingJob.perform_later(RuntimeError, i) } + 1.upto(5) { |i| RaisingJob.perform_later(ExpectedTestError, i) } 1.upto(3) { |i| AddToBufferJob.perform_later(i) } @worker.start diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index a9b3cc59..17a658d7 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -68,6 +68,16 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert_equal solid_queue_job.scheduled_at, execution.scheduled_at end + test "enqueue jobs within a connected_to block for the primary DB" do + ShardedRecord.connected_to(role: :writing, shard: :shard_two) do + ShardedJobResult.create!(value: "in shard two") + AddToBufferJob.perform_later("enqueued within block") + end + + job = SolidQueue::Job.last + assert_equal "enqueued within block", job.arguments.dig("arguments", 0) + end + test "enqueue jobs without concurrency controls" do active_job = AddToBufferJob.perform_later(1) assert_nil active_job.concurrency_limit @@ -249,21 +259,19 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob end end - if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite" - test "uses a different connection and transaction than the one in use when connects_to is specified" do - assert_difference -> { SolidQueue::Job.count } do - assert_no_difference -> { JobResult.count } do - JobResult.transaction do - JobResult.create!(queue_name: "default", value: "this will be rolled back") - StoreResultJob.perform_later("enqueued inside a rolled back transaction") - raise ActiveRecord::Rollback - end + test "enqueue successfully inside a rolled-back transaction in the app DB" do + assert_difference -> { SolidQueue::Job.count } do + assert_no_difference -> { JobResult.count } do + JobResult.transaction do + JobResult.create!(queue_name: "default", value: "this will be rolled back") + StoreResultJob.perform_later("enqueued inside a rolled back transaction") + raise ActiveRecord::Rollback end end - - job = SolidQueue::Job.last - assert_equal "enqueued inside a rolled back transaction", job.arguments.dig("arguments", 0) end + + job = SolidQueue::Job.last + assert_equal "enqueued inside a rolled back transaction", job.arguments.dig("arguments", 0) end private diff --git a/test/models/solid_queue/ready_execution_test.rb b/test/models/solid_queue/ready_execution_test.rb index 9f904c89..dd9269ca 100644 --- a/test/models/solid_queue/ready_execution_test.rb +++ b/test/models/solid_queue/ready_execution_test.rb @@ -49,28 +49,51 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase end end - test "queue order and then priority is respected when using a list of queues" do + test "claim jobs using a wildcard" do AddToBufferJob.perform_later("hey") - job = SolidQueue::Job.last - assert_equal "background", job.queue_name - assert_claimed_jobs(3) do - SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42) + assert_claimed_jobs(6) do + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) end + end - assert job.reload.claimed? - @jobs.first(2).each do |job| - assert_not job.reload.ready? - assert job.claimed? + test "claim jobs using queue prefixes" do + AddToBufferJob.perform_later("hey") + + assert_claimed_jobs(1) do + SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42) end + + assert @jobs.none?(&:claimed?) end - test "claim jobs using a wildcard" do + test "claim jobs using a wildcard and having paused queues" do AddToBufferJob.perform_later("hey") - assert_claimed_jobs(6) do + SolidQueue::Queue.find_by_name("backend").pause + + assert_claimed_jobs(1) do SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) end + + @jobs.each(&:reload) + assert @jobs.none?(&:claimed?) + end + + test "claim jobs using both exact names and a prefix" do + AddToBufferJob.perform_later("hey") + + assert_claimed_jobs(6) do + SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42) + end + end + + test "claim jobs for queue without jobs at the moment using prefixes" do + AddToBufferJob.perform_later("hey") + + assert_claimed_jobs(0) do + SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42) + end end test "priority order is used when claiming jobs using a wildcard" do @@ -88,43 +111,61 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase end end - test "claim jobs using queue prefixes" do + test "queue order and then priority is respected when using a list of queues" do AddToBufferJob.perform_later("hey") + job = SolidQueue::Job.last + assert_equal "background", job.queue_name - assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42) + assert_claimed_jobs(3) do + SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42) end - assert @jobs.none?(&:claimed?) + assert job.reload.claimed? + @jobs.first(2).each do |job| + assert_not job.reload.ready? + assert job.claimed? + end end - test "claim jobs using a wildcard and having paused queues" do - AddToBufferJob.perform_later("hey") + test "queue order is respected when using prefixes" do + %w[ queue_b1 queue_b2 queue_a2 queue_a1 queue_b1 queue_a2 queue_b2 queue_a1 ].each do |queue_name| + AddToBufferJob.set(queue: queue_name).perform_later(1) + end - SolidQueue::Queue.find_by_name("backend").pause + # Claim 8 jobs + claimed_jobs = [] + 4.times do + assert_claimed_jobs(2) do + SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, 42) + end - assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) + claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) end - @jobs.each(&:reload) - assert @jobs.none?(&:claimed?) + # Check claim order + assert_equal %w[ queue_b1 queue_b1 queue_b2 queue_b2 queue_a1 queue_a1 queue_a2 queue_a2 ], + claimed_jobs.map(&:queue_name) end - test "claim jobs using both exact names and a prefixes" do - AddToBufferJob.perform_later("hey") - assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42) + test "queue order is respected when mixing exact names with prefixes" do + %w[ queue_b1 queue_b2 queue_a2 queue_c2 queue_a1 queue_c1 queue_b1 queue_a2 queue_b2 queue_a1 ].each do |queue_name| + AddToBufferJob.set(queue: queue_name).perform_later(1) end - end - test "claim jobs for queue without jobs at the moment using prefixes" do - AddToBufferJob.perform_later("hey") + # Claim 10 jobs + claimed_jobs = [] + 5.times do + assert_claimed_jobs(2) do + SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, 42) + end - assert_claimed_jobs(0) do - SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42) + claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) end + + # Check claim order + assert_equal %w[ queue_a2 queue_a2 queue_c1 queue_b1 queue_b1 queue_b2 queue_b2 queue_c2 queue_a1 queue_a1 ], + claimed_jobs.map(&:queue_name) end test "discard all" do diff --git a/test/test_helper.rb b/test/test_helper.rb index 176cb6e1..539f67bc 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -24,11 +24,20 @@ def write(...) end Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions) +class ExpectedTestError < RuntimeError; end + class ActiveSupport::TestCase - include ProcessesTestHelper, JobsTestHelper + include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper + + setup do + # Could be cleaner with one several minitest gems, but didn't want to add new dependency + @_on_thread_error = SolidQueue.on_thread_error + SolidQueue.on_thread_error = silent_on_thread_error_for(ExpectedTestError) + end teardown do + SolidQueue.on_thread_error = @_on_thread_error JobBuffer.clear if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile) @@ -69,4 +78,28 @@ def wait_while_with_timeout!(timeout, &block) def skip_active_record_query_cache(&block) SolidQueue::Record.uncached(&block) end + + # Silences specified exceptions during the execution of a block + # + # @param [Exception, Array] expected an Exception or an array of Exceptions to ignore + # @yield Executes the provided block with specified exception(s) silenced + def silence_on_thread_error_for(expected, &block) + SolidQueue.with(on_thread_error: silent_on_thread_error_for(expected)) do + block.call + end + end + + # Does not call on_thread_error for expected exceptions + # @param [Exception, Array] expected an Exception or an array of Exceptions to ignore + def silent_on_thread_error_for(expected) + current_proc = SolidQueue.on_thread_error + + ->(exception) do + expected_exceptions = Array(expected) + + unless expected_exceptions.any? { exception.instance_of?(_1) } + current_proc.call(exception) + end + end + end end diff --git a/test/test_helpers/configuration_test_helper.rb b/test/test_helpers/configuration_test_helper.rb new file mode 100644 index 00000000..24b95e6b --- /dev/null +++ b/test/test_helpers/configuration_test_helper.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +module ConfigurationTestHelper + def config_file_path(name) + Rails.root.join("config/#{name}.yml") + end +end diff --git a/test/test_helpers/processes_test_helper.rb b/test/test_helpers/processes_test_helper.rb index 729216bd..9a6d0f65 100644 --- a/test/test_helpers/processes_test_helper.rb +++ b/test/test_helpers/processes_test_helper.rb @@ -7,6 +7,16 @@ def run_supervisor_as_fork(**options) end end + def run_supervisor_as_fork_with_captured_io(**options) + pid = nil + out, err = capture_subprocess_io do + pid = run_supervisor_as_fork(**options) + wait_for_registered_processes(4) + end + + [ pid, out, err ] + end + def wait_for_registered_processes(count, timeout: 1.second) wait_while_with_timeout(timeout) { SolidQueue::Process.count != count } end diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 556a4930..87b8726e 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -54,11 +54,6 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :worker, 2 end - test "max number of threads" do - configuration = SolidQueue::Configuration.new - assert 7, configuration.max_number_of_threads - end - test "mulitple workers with the same configuration" do background_worker = { queues: "background", polling_interval: 10, processes: 3 } configuration = SolidQueue::Configuration.new(workers: [ background_worker ]) @@ -90,6 +85,32 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil end + test "validate configuration" do + # Valid and invalid recurring tasks + configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid)) + assert_not configuration.valid? + assert configuration.errors.full_messages.one? + error = configuration.errors.full_messages.first + + assert error.include?("Invalid recurring tasks") + assert error.include?("periodic_invalid_class: Class name doesn't correspond to an existing class") + assert error.include?("periodic_incorrect_schedule: Schedule is not a supported recurring schedule") + + assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid? + assert SolidQueue::Configuration.new(skip_recurring: true).valid? + + # No processes + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: []) + assert_not configuration.valid? + assert_equal [ "No processes configured" ], configuration.errors.full_messages + + # Not enough DB connections + configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ]) + assert_not configuration.valid? + assert_match /Solid Queue is configured to use \d+ threads but the database connection pool is \d+. Increase it in `config\/database.yml`/, + configuration.errors.full_messages.first + end + private def assert_processes(configuration, kind, count, **attributes) processes = configuration.configured_processes.select { |p| p.kind == kind } @@ -121,8 +142,4 @@ def assert_equal_value(expected_value, value) assert_equal expected_value, value end end - - def config_file_path(name) - Rails.root.join("config/#{name}.yml") - end end diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 42d57c92..5bca7743 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -92,6 +92,30 @@ class DispatcherTest < ActiveSupport::TestCase another_dispatcher&.stop end + test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do + dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3) + dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once + + 3.times { AddToBufferJob.set(wait: 0.1).perform_later("I'm scheduled") } + assert_equal 3, SolidQueue::ScheduledExecution.count + sleep 0.1 + + dispatcher.start + wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? } + + assert_equal 0, SolidQueue::ScheduledExecution.count + assert_equal 3, SolidQueue::ReadyExecution.count + end + + test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do + dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + dispatcher.expects(:interruptible_sleep).with(0.seconds).never + dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once + dispatcher.start + sleep 0.1 + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence diff --git a/test/unit/queue_test.rb b/test/unit/queue_test.rb index f546c8d6..c89c932c 100644 --- a/test/unit/queue_test.rb +++ b/test/unit/queue_test.rb @@ -2,6 +2,8 @@ class QueueTest < ActiveSupport::TestCase setup do + freeze_time + 5.times do AddToBufferJob.perform_later "hey!" end @@ -39,4 +41,35 @@ class QueueTest < ActiveSupport::TestCase @default_queue.resume end end + + test "return latency in seconds on each queue" do + travel_to 5.minutes.from_now + + assert_in_delta 5.minutes.to_i, @background_queue.latency, 1.second.to_i + assert_equal 0, @default_queue.latency + + @background_queue = SolidQueue::Queue.find_by_name("background") + @default_queue = SolidQueue::Queue.find_by_name("default") + travel_to 10.minutes.from_now + + assert_in_delta 15.minutes.to_i, @background_queue.latency, 1.second.to_i + assert_equal 0, @default_queue.latency + end + + test "returns memoized latency after the first call" do + travel_to 5.minutes.from_now + + assert_in_delta 5.minutes.to_i, @background_queue.latency, 1.second.to_i + + travel_to 10.minutes.from_now + + assert_in_delta 5.minutes.to_i, @background_queue.latency, 1.second.to_i + end + + test "return human latency on each queue" do + travel_to 5.minutes.from_now + + assert_match (/5 minutes/), @background_queue.human_latency + assert_match (/0 seconds/), @default_queue.human_latency + end end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index d4919070..c430544a 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -41,11 +41,22 @@ class SupervisorTest < ActiveSupport::TestCase end test "start with empty configuration" do - pid = run_supervisor_as_fork(workers: [], dispatchers: []) + pid, _out, error = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: []) sleep(0.5) assert_no_registered_processes assert_not process_exists?(pid) + assert_match %r{No processes configured}, error + end + + test "start with invalid recurring tasks" do + pid, _out, error = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false) + + sleep(0.5) + assert_no_registered_processes + + assert_not process_exists?(pid) + assert_match %r{Invalid recurring tasks}, error end test "create and delete pidfile" do @@ -66,11 +77,12 @@ class SupervisorTest < ActiveSupport::TestCase FileUtils.mkdir_p(File.dirname(@pidfile)) File.write(@pidfile, ::Process.pid.to_s) - pid = run_supervisor_as_fork + pid, _out, err = run_supervisor_as_fork_with_captured_io wait_for_registered_processes(4) assert File.exist?(@pidfile) assert_not_equal pid, File.read(@pidfile).strip.to_i + assert_match %r{A Solid Queue supervisor is already running}, err wait_for_process_termination_with_timeout(pid, exitstatus: 1) end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 3523e4a1..52b0d8e8 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -28,14 +28,14 @@ class WorkerTest < ActiveSupport::TestCase original_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { errors << error.message } previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - SolidQueue::ReadyExecution.expects(:claim).raises(RuntimeError.new("everything is broken")).at_least_once + SolidQueue::ReadyExecution.expects(:claim).raises(ExpectedTestError.new("everything is broken")).at_least_once AddToBufferJob.perform_later "hey!" worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2).tap(&:start) sleep(1) - assert_raises RuntimeError do + assert_raises ExpectedTestError do worker.stop end @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase subscriber = ErrorBuffer.new Rails.error.subscribe(subscriber) - SolidQueue::ClaimedExecution::Result.expects(:new).raises(RuntimeError.new("everything is broken")).at_least_once + SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once AddToBufferJob.perform_later "hey!" @@ -67,6 +67,23 @@ class WorkerTest < ActiveSupport::TestCase SolidQueue.on_thread_error = original_on_thread_error end + test "errors on claimed executions are reported via Rails error subscriber" do + subscriber = ErrorBuffer.new + Rails.error.subscribe(subscriber) + + RaisingJob.perform_later(ExpectedTestError, "B") + + @worker.start + + wait_for_jobs_to_finish_for(1.second) + @worker.wake_up + + assert_equal 1, subscriber.errors.count + assert_equal "This is a ExpectedTestError exception", subscriber.messages.first + ensure + Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe) + end + test "claim and process more enqueued jobs than the pool size allows to process at once" do 5.times do |i| StoreResultJob.perform_later(:paused, pause: 0.1.second) @@ -154,6 +171,26 @@ class WorkerTest < ActiveSupport::TestCase SolidQueue.process_heartbeat_interval = old_heartbeat_interval end + test "sleeps `10.minutes` if at capacity" do + 3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + + @worker.expects(:interruptible_sleep).with(10.minutes).at_least_once + @worker.expects(:interruptible_sleep).with(@worker.polling_interval).never + + @worker.start + sleep 1.second + end + + test "sleeps `polling_interval` if worker not at capacity" do + 2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + + @worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once + @worker.expects(:interruptible_sleep).with(10.minutes).never + + @worker.start + sleep 1.second + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence