diff --git a/lib/sequent/core/aggregate_snapshotter.rb b/lib/sequent/core/aggregate_snapshotter.rb index 7a20955f..df9d8415 100644 --- a/lib/sequent/core/aggregate_snapshotter.rb +++ b/lib/sequent/core/aggregate_snapshotter.rb @@ -1,18 +1,20 @@ module Sequent module Core + + ## + # Take up to `limit` snapshots when needed. Throws `:done` when done. + # class SnapshotCommand < Sequent::Core::BaseCommand attrs limit: Integer end - class AggregateSnapshotter < BaseCommandHandler + ## + # Take snapshot of given aggregate + class TakeSnapshot < Sequent::Core::Command + end - def self.handles_message?(message) - message.is_a? SnapshotCommand - end + class AggregateSnapshotter < BaseCommandHandler - ## - # Take up to `limit` snapshots when needed. Throws `:done` when done. - # on SnapshotCommand do |command| aggregate_ids = repository.event_store.aggregates_that_need_snapshots(@last_aggregate_id, command.limit) aggregate_ids.each do |aggregate_id| @@ -22,6 +24,10 @@ def self.handles_message?(message) throw :done if @last_aggregate_id.nil? end + on TakeSnapshot do |command| + take_snapshot!(command.aggregate_id) + end + def take_snapshot!(aggregate_id) aggregate = @repository.load_aggregate(aggregate_id) Sequent.logger.info "Taking snapshot for aggregate #{aggregate}" diff --git a/lib/sequent/core/snapshots.rb b/lib/sequent/core/snapshots.rb deleted file mode 100644 index 5bdd4c55..00000000 --- a/lib/sequent/core/snapshots.rb +++ /dev/null @@ -1,23 +0,0 @@ -module Sequent - module Core - class Snapshots - - def aggregates_that_need_snapshots(events_since_last_snapshot: 20, limit: 10, last_aggregate_id: nil) - query = %Q{ -SELECT aggregate_id - FROM event_records events - WHERE aggregate_id > '#{last_aggregate_id}' - GROUP BY aggregate_id -HAVING MAX(sequence_number) - (COALESCE((SELECT MAX(sequence_number) - FROM event_records snapshots - WHERE event_type = 'Sequent::Core::SnapshotEvent' - AND snapshots.aggregate_id = events.aggregate_id), 0)) > #{events_since_last_snapshot} - ORDER BY aggregate_id - LIMIT #{limit}; -} - @record_class.connection.select_all(query).to_a - end - - end - end -end diff --git a/spec/lib/sequent/core/aggregate_snapshotter_spec.rb b/spec/lib/sequent/core/aggregate_snapshotter_spec.rb new file mode 100644 index 00000000..43b01ad0 --- /dev/null +++ b/spec/lib/sequent/core/aggregate_snapshotter_spec.rb @@ -0,0 +1,40 @@ +require 'spec_helper' + +describe Sequent::Core::AggregateSnapshotter do + class MyEvent < Sequent::Core::Event; end + class MyAggregate < Sequent::Core::AggregateRoot; end + + let(:command_handler) { described_class.new } + let(:event_store) { Sequent::configuration.event_store } + let(:aggregate_id) { Sequent.new_uuid } + + let(:take_snapshot) { Sequent::Core::TakeSnapshot.new(aggregate_id: aggregate_id) } + + around do |example| + commands_handlers = Sequent::configuration.command_handlers + begin + example.run + ensure + Sequent::configuration.command_handlers = commands_handlers + end + end + + before :each do + Sequent::configuration.command_handlers << described_class.new + event_store.commit_events( + Sequent::Core::CommandRecord.new, + [ + [ + Sequent::Core::EventStream.new(aggregate_type: 'MyAggregate', aggregate_id: aggregate_id, snapshot_threshold: 1), + [MyEvent.new(aggregate_id: aggregate_id, sequence_number: 1)] + ] + ] + ) + end + + it 'can take a snapshot' do + Sequent.command_service.execute_commands(*take_snapshot) + + expect(Sequent::Core::EventRecord.last.event_type).to eq Sequent::Core::SnapshotEvent.name + end +end diff --git a/spec/lib/sequent/support/database_spec.rb b/spec/lib/sequent/support/database_spec.rb index 65ff5c39..03e20db8 100644 --- a/spec/lib/sequent/support/database_spec.rb +++ b/spec/lib/sequent/support/database_spec.rb @@ -116,14 +116,6 @@ def table_exists?(table_name) results = ActiveRecord::Base.connection.select_all %Q( SELECT 1 FROM pg_tables WHERE tablename = '#{table_name}' -) - results.count == 1 - end - - def table_exists?(table_name) - results = ActiveRecord::Base.connection.select_all %Q( -SELECT 1 FROM pg_tables - WHERE tablename = '#{table_name}' ) results.count == 1 end