Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make event query faster by eliminating the subquery check for every row. #95

Merged
merged 1 commit into from
Oct 9, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions lib/sequent/core/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,8 @@ def load_events_for_aggregates(aggregate_ids)

streams = stream_record_class.where(aggregate_id: aggregate_ids)

events = event_record_class.connection.select_all(%Q{
SELECT event_type, event_json
FROM #{quote_table_name event_record_class.table_name} AS o
WHERE aggregate_id in (#{aggregate_ids.map{ |aggregate_id| quote(aggregate_id)}.join(",")})
AND sequence_number >= COALESCE((SELECT MAX(sequence_number)
FROM #{quote_table_name event_record_class.table_name} AS i
WHERE event_type = #{quote snapshot_event_class.name}
AND i.aggregate_id = o.aggregate_id), 0)
ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote snapshot_event_class.name} THEN 0 ELSE 1 END) ASC
}).map! do |event_hash|
query = aggregate_ids.uniq.map { |aggregate_id| aggregate_query(aggregate_id) }.join(" UNION ALL ")
events = event_record_class.connection.select_all(query).map! do |event_hash|
deserialize_event(event_hash)
end

Expand All @@ -88,6 +80,19 @@ def load_events_for_aggregates(aggregate_ids)
.map { |aggregate_id, _events| [streams.find { |stream_record| stream_record.aggregate_id == aggregate_id }.event_stream, _events] }
end

def aggregate_query(aggregate_id)
%Q{(
SELECT event_type, event_json
FROM #{quote_table_name event_record_class.table_name} AS o
WHERE aggregate_id = #{quote(aggregate_id)}
AND sequence_number >= COALESCE((SELECT MAX(sequence_number)
FROM #{quote_table_name event_record_class.table_name} AS i
WHERE event_type = #{quote snapshot_event_class.name}
AND i.aggregate_id = #{quote(aggregate_id)}), 0)
ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote snapshot_event_class.name} THEN 0 ELSE 1 END) ASC
)}
end

def stream_exists?(aggregate_id)
stream_record_class.exists?(aggregate_id: aggregate_id)
end
Expand Down