diff --git a/lib/sequent/core/event_store.rb b/lib/sequent/core/event_store.rb index 7ec32c8e..7d55fe6b 100644 --- a/lib/sequent/core/event_store.rb +++ b/lib/sequent/core/event_store.rb @@ -70,16 +70,8 @@ def load_events_for_aggregates(aggregate_ids) streams = stream_record_class.where(aggregate_id: aggregate_ids) - events = event_record_class.connection.select_all(%Q{ -SELECT event_type, event_json - FROM #{quote_table_name event_record_class.table_name} AS o -WHERE aggregate_id in (#{aggregate_ids.map{ |aggregate_id| quote(aggregate_id)}.join(",")}) -AND sequence_number >= COALESCE((SELECT MAX(sequence_number) - FROM #{quote_table_name event_record_class.table_name} AS i - WHERE event_type = #{quote snapshot_event_class.name} - AND i.aggregate_id = o.aggregate_id), 0) -ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote snapshot_event_class.name} THEN 0 ELSE 1 END) ASC -}).map! do |event_hash| + query = aggregate_ids.uniq.map { |aggregate_id| aggregate_query(aggregate_id) }.join(" UNION ALL ") + events = event_record_class.connection.select_all(query).map! do |event_hash| deserialize_event(event_hash) end @@ -88,6 +80,19 @@ def load_events_for_aggregates(aggregate_ids) .map { |aggregate_id, _events| [streams.find { |stream_record| stream_record.aggregate_id == aggregate_id }.event_stream, _events] } end + def aggregate_query(aggregate_id) + %Q{( +SELECT event_type, event_json + FROM #{quote_table_name event_record_class.table_name} AS o +WHERE aggregate_id = #{quote(aggregate_id)} +AND sequence_number >= COALESCE((SELECT MAX(sequence_number) + FROM #{quote_table_name event_record_class.table_name} AS i + WHERE event_type = #{quote snapshot_event_class.name} + AND i.aggregate_id = #{quote(aggregate_id)}), 0) +ORDER BY sequence_number ASC, (CASE event_type WHEN #{quote snapshot_event_class.name} THEN 0 ELSE 1 END) ASC +)} + end + def stream_exists?(aggregate_id) stream_record_class.exists?(aggregate_id: aggregate_id) end