Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor adapter enqueing methods; expand Readme, tests, editorconfig #3

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# EditorConfig is awesome: http://EditorConfig.org

root = true

[*]
charset = utf-8
end_of_line = lf
indent_size = 2
indent_style = space
insert_final_newline = true
68 changes: 61 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,84 @@
# GoodJob
Short description and motivation.

GoodJob is a multithreaded, Postgres-based ActiveJob backend for Ruby on Rails.

## Usage
How to use my plugin.

1. Create a database migration:
```bash
bin/rails g migration CreateGoodJobs
```

And then add to the newly created file:

```ruby
class CreateGoodJobs < ActiveRecord::Migration[6.0]
def change
enable_extension 'pgcrypto'

create_table :good_jobs, id: :uuid do |t|
t.timestamps

t.text :queue_name
t.integer :priority
t.jsonb :serialized_params
t.timestamp :scheduled_at
end
end
end
```
1. Configure the ActiveJob adapter:
```ruby
# config/environments/production.rb
config.active_job.queue_adapter = GoodJob::Adapter.new

# config/environments/development.rb
config.active_job.queue_adapter = GoodJob::Adapter.new(inline: true)
```

1. In production, the scheduler is designed to run in its own process:

```ruby
# TBD
```

## Installation
Add this line to your application's Gemfile:

```ruby
gem 'good_job'
gem 'good_job', github: 'bensheldon/good_job'
```

And then execute:
```bash
$ bundle
```

Or install it yourself as:
## Development

To run tests:

```bash
$ gem install good_job
# Clone the repository locally
$ git clone git@github.com:bensheldon/good_job.git

# Set up the local environment
$ bin/setup_test

# Run the tests
$ bin/rspec
```

## Development
For developing locally within another Ruby on Rails project:

```bash
$ bin/test
# Within Ruby on Rails directory...
$ bundle config local.good_job /path/to/local/git/repository

# Confirm that the local copy is used
$ bundle install

# => Using good_job 0.1.0 from https://github.com/bensheldon/good_job.git (at /Users/You/Projects/good_job@dc57fb0)
```

## Contributing
Expand Down
17 changes: 6 additions & 11 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,23 @@ def inline?
end

def enqueue(job)
good_job = GoodJob::Job.create(
queue_name: job.queue_name,
priority: job.priority,
serialized_params: job.serialize
)

@scheduler.enqueue(good_job) if inline?
enqueue_at(job, nil)
end

def enqueue_at(job, timestamp)
good_job = GoodJob::Job.create(
params = {
queue_name: job.queue_name,
priority: job.priority,
serialized_params: job.serialize,
scheduled_at: Time.at(timestamp)
)
}
params[:scheduled_at] = Time.at(timestamp) if timestamp

good_job = GoodJob::Job.create(params)
@scheduler.enqueue(good_job) if inline?
end

def shutdown(wait: true)
@scheduler.shutdown(wait: wait) if @scheduler
@scheduler&.shutdown(wait: wait)
end
end
end
81 changes: 58 additions & 23 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,80 @@ def perform(*args, **kwargs)
THREAD_JOBS[thread_name] << provider_job_id
end
end)
end

let(:adapter) { GoodJob::Adapter.new }
stub_const 'ErrorJob', (Class.new(ApplicationJob) do
RetryableError = Class.new(StandardError)

let(:number_of_jobs) { 250 }
self.queue_name = 'test'
self.priority = 50
retry_on(RetryableError, wait: 0, attempts: 3) do |job, error|
puts "FAILED"
end

let!(:good_jobs) do
number_of_jobs.times do |i|
ExampleJob.perform_later(i)
end
def perform(*args, **kwargs)
thread_name = Thread.current.name || Thread.current.object_id

RUN_JOBS << { args: args, kwargs: kwargs }
THREAD_JOBS[thread_name] << provider_job_id

raise RetryableError
end
end), transfer_nested_constants: true
end

it 'pops items off of the queue and runs them' do
scheduler = GoodJob::Scheduler.new
let(:adapter) { GoodJob::Adapter.new }

context 'large number of jobs' do
let(:number_of_jobs) { 250 }

Timeout.timeout(5) do
sleep(0.5) until GoodJob::Job.count == 0
let!(:good_jobs) do
number_of_jobs.times do |i|
ExampleJob.perform_later(i)
end
end

if RUN_JOBS.size != number_of_jobs
jobs = THREAD_JOBS.values.flatten
it 'pops items off of the queue and runs them' do
scheduler = GoodJob::Scheduler.new

jobs_tally = jobs.each_with_object(Hash.new(0)) do |job_id, hash|
hash[job_id] += 1
Timeout.timeout(5) do
sleep(0.5) until GoodJob::Job.count == 0
end

rerun_jobs = jobs_tally.select { |key, value| value > 1 }
if RUN_JOBS.size != number_of_jobs
jobs = THREAD_JOBS.values.flatten

rerun_jobs.each do |job_id, tally|
rerun_threads = THREAD_JOBS.select { |thread, jobs| jobs.include? job_id }.keys
jobs_tally = jobs.each_with_object(Hash.new(0)) do |job_id, hash|
hash[job_id] += 1
end

puts "Ran job id #{job_id} for #{tally} times on threads #{rerun_threads}"
rerun_jobs = jobs_tally.select { |key, value| value > 1 }

rerun_jobs.each do |job_id, tally|
rerun_threads = THREAD_JOBS.select { |thread, jobs| jobs.include? job_id }.keys

puts "Ran job id #{job_id} for #{tally} times on threads #{rerun_threads}"
end
end

scheduler.shutdown

expect(GoodJob::Job.count).to eq(0), -> { "Unworked jobs are #{GoodJob::Job.all.map(&:id)}" }
expect(rerun_jobs).to be_nil
expect(RUN_JOBS.size).to eq number_of_jobs
end
end

context 'jobs with errors' do
let!(:jobs) { ErrorJob.perform_later }

scheduler.shutdown
it "handles and retries jobs with errors" do
scheduler = GoodJob::Scheduler.new

expect(GoodJob::Job.count).to eq(0), -> { "Unworked jobs are #{GoodJob::Job.all.map(&:id)}" }
expect(rerun_jobs).to be_nil
expect(RUN_JOBS.size).to eq number_of_jobs
Timeout.timeout(5) do
sleep(0.5) until GoodJob::Job.count == 0
end

scheduler.shutdown
end
end
end