Skip to content

Commit

Permalink
Add support for ActiveRecord::Relation#load_async method. (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxshend authored Apr 23, 2022
1 parent 0d08228 commit b535e5a
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 28 deletions.
3 changes: 2 additions & 1 deletion io_to_response_payload_ratio.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ Gem::Specification.new do |spec|

spec.required_ruby_version = ">= 2.6.0"

spec.add_dependency "rails", ">= 6.0"
rails_version = ENV["RAILS_VERSION"] || ">= 6.0"
spec.add_dependency "rails", rails_version
end
25 changes: 23 additions & 2 deletions lib/io_to_response_payload_ratio/adapters/active_record_adapter.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
# frozen_string_literal: true

require "io_to_response_payload_ratio/patches/abstract_adapter_patch"
require "io_to_response_payload_ratio/patches/future_result_patch"

module IoToResponsePayloadRatio
class ActiveRecordAdapter < BaseAdapter
def self.kind
:active_record
class << self
def kind
:active_record
end

def aggregate_result(rows:)
return unless IoToResponsePayloadRatio.aggregator.active?

# `.flatten.join.bytesize` would look prettier,
# but it makes a lot of unnecessary allocations.
io_payload_size = rows.sum(0) do |row|
row.sum(0) do |val|
(String === val ? val : val.to_s).bytesize
end
end

IoToResponsePayloadRatio.aggregator.increment(kind, io_payload_size)
end
end

def initialize!
ActiveSupport.on_load(:active_record) do
ActiveRecord::ConnectionAdapters::AbstractAdapter.prepend(AbstractAdapterPatch)

if Rails::VERSION::MAJOR >= 7
ActiveRecord::FutureResult.prepend(FutureResultPatch)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,7 @@
module IoToResponsePayloadRatio
module AbstractAdapterPatch
def build_result(*args, **kwargs, &block)
if IoToResponsePayloadRatio.aggregator.active?
# `.flatten.join.bytesize` would look prettier,
# but it makes a lot of unnecessary allocations.
io_payload_size = kwargs[:rows].sum(0) do |row|
row.sum(0) do |val|
(String === val ? val : val.to_s).bytesize
end
end

IoToResponsePayloadRatio.aggregator
.increment(ActiveRecordAdapter.kind, io_payload_size)
end
ActiveRecordAdapter.aggregate_result rows: kwargs[:rows]

super
end
Expand Down
15 changes: 15 additions & 0 deletions lib/io_to_response_payload_ratio/patches/future_result_patch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module IoToResponsePayloadRatio
module FutureResultPatch
def result
# @event_buffer is used to send ActiveSupport notifications related to async queries
return super unless @event_buffer

res = super
ActiveRecordAdapter.aggregate_result rows: res.rows

res
end
end
end
4 changes: 4 additions & 0 deletions spec/dummy/config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ class Application < Rails::Application
config.logger = Logger.new("/dev/null")
config.api_only = true
config.active_record.legacy_connection_handling = false

if Rails::VERSION::MAJOR >= 7
config.active_record.async_query_executor = :global_thread_pool
end
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# frozen_string_literal: true

require_relative "../../support/helpers/async_helper"

RSpec.describe IoToResponsePayloadRatio::ActiveRecordAdapter do
include Helpers::AsyncHelper

let(:aggregator) { IoToResponsePayloadRatio.aggregator }

with_model :Fake, scope: :all do
Expand All @@ -27,23 +31,41 @@
aggregator.stop!
end

context "when aggregator is inactive" do
before do
aggregator.stop!
describe "without async queries" do
context "when aggregator is inactive" do
before do
aggregator.stop!
end

it "does nothing" do
expect(aggregator).not_to receive(:increment)

Fake.all.to_a
end
end

it "does nothing" do
expect(aggregator).not_to receive(:increment)
it "increments aggregator by query result's bytesize" do
allow(aggregator).to receive(:increment)

Fake.all.to_a
bytesize = Fake.pluck(:id, :name).flatten.join.bytesize

expect(aggregator).to have_received(:increment).with(described_class.kind, bytesize)
end
end

it "increments aggregator by query result's bytesize" do
allow(aggregator).to receive(:increment)
if Rails::VERSION::MAJOR >= 7
context "when load_async is used" do
let!(:bytesize) { Fake.pluck(:id, :name).flatten.join.bytesize }

it "increments aggregator by query result's bytesize", skip_transaction: true do
allow(aggregator).to receive(:increment)

bytesize = Fake.pluck(:id, :name).flatten.join.bytesize
relation = Fake.all.load_async
wait_for_async_query(relation)
relation.to_a

expect(aggregator).to have_received(:increment).with(described_class.kind, bytesize)
expect(aggregator).to have_received(:increment).with(described_class.kind, bytesize)
end
end
end
end
10 changes: 7 additions & 3 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
config.extend WithModel

config.before(:suite) do
DatabaseCleaner.strategy = :transaction
DatabaseCleaner.clean_with(:truncation)
end

config.around(:each) do |example|
DatabaseCleaner.cleaning { example.run }
config.before(:each) do |e|
DatabaseCleaner.strategy = e.metadata[:skip_transaction] ? :truncation : :transaction
DatabaseCleaner.start
end

config.append_after(:each) do
DatabaseCleaner.clean
end

config.expect_with :rspec do |c|
Expand Down
19 changes: 19 additions & 0 deletions spec/support/helpers/async_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module Helpers
module AsyncHelper
def wait_for_async_query(relation, timeout: 5)
if !relation.connection.async_enabled? || relation.instance_variable_get(:@records)
raise ArgumentError, "async hasn't been enabled or used"
end

future_result = relation.instance_variable_get(:@future_result)
(timeout * 100).times do
return relation unless future_result.pending?
sleep 0.01
end

raise Timeout::Error, "The async executor wasn't drained after #{timeout} seconds"
end
end
end

0 comments on commit b535e5a

Please sign in to comment.