Skip to content

Commit

Permalink
Merge pull request #92 from zilverline/add_new_snapshot_command
Browse files Browse the repository at this point in the history
Add new snapshot command
  • Loading branch information
lvonk authored Jul 18, 2017
2 parents bbddaa2 + 0b4e66a commit d5cf362
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 38 deletions.
20 changes: 13 additions & 7 deletions lib/sequent/core/aggregate_snapshotter.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
module Sequent
module Core

##
# Take up to `limit` snapshots when needed. Throws `:done` when done.
#
class SnapshotCommand < Sequent::Core::BaseCommand
attrs limit: Integer
end

class AggregateSnapshotter < BaseCommandHandler
##
# Take snapshot of given aggregate
class TakeSnapshot < Sequent::Core::Command
end

def self.handles_message?(message)
message.is_a? SnapshotCommand
end
class AggregateSnapshotter < BaseCommandHandler

##
# Take up to `limit` snapshots when needed. Throws `:done` when done.
#
on SnapshotCommand do |command|
aggregate_ids = repository.event_store.aggregates_that_need_snapshots(@last_aggregate_id, command.limit)
aggregate_ids.each do |aggregate_id|
Expand All @@ -22,6 +24,10 @@ def self.handles_message?(message)
throw :done if @last_aggregate_id.nil?
end

on TakeSnapshot do |command|
take_snapshot!(command.aggregate_id)
end

def take_snapshot!(aggregate_id)
aggregate = @repository.load_aggregate(aggregate_id)
Sequent.logger.info "Taking snapshot for aggregate #{aggregate}"
Expand Down
23 changes: 0 additions & 23 deletions lib/sequent/core/snapshots.rb

This file was deleted.

40 changes: 40 additions & 0 deletions spec/lib/sequent/core/aggregate_snapshotter_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
require 'spec_helper'

describe Sequent::Core::AggregateSnapshotter do
class MyEvent < Sequent::Core::Event; end
class MyAggregate < Sequent::Core::AggregateRoot; end

let(:command_handler) { described_class.new }
let(:event_store) { Sequent::configuration.event_store }
let(:aggregate_id) { Sequent.new_uuid }

let(:take_snapshot) { Sequent::Core::TakeSnapshot.new(aggregate_id: aggregate_id) }

around do |example|
commands_handlers = Sequent::configuration.command_handlers
begin
example.run
ensure
Sequent::configuration.command_handlers = commands_handlers
end
end

before :each do
Sequent::configuration.command_handlers << described_class.new
event_store.commit_events(
Sequent::Core::CommandRecord.new,
[
[
Sequent::Core::EventStream.new(aggregate_type: 'MyAggregate', aggregate_id: aggregate_id, snapshot_threshold: 1),
[MyEvent.new(aggregate_id: aggregate_id, sequence_number: 1)]
]
]
)
end

it 'can take a snapshot' do
Sequent.command_service.execute_commands(*take_snapshot)

expect(Sequent::Core::EventRecord.last.event_type).to eq Sequent::Core::SnapshotEvent.name
end
end
8 changes: 0 additions & 8 deletions spec/lib/sequent/support/database_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,6 @@ def table_exists?(table_name)
results = ActiveRecord::Base.connection.select_all %Q(
SELECT 1 FROM pg_tables
WHERE tablename = '#{table_name}'
)
results.count == 1
end

def table_exists?(table_name)
results = ActiveRecord::Base.connection.select_all %Q(
SELECT 1 FROM pg_tables
WHERE tablename = '#{table_name}'
)
results.count == 1
end
Expand Down

0 comments on commit d5cf362

Please sign in to comment.