Skip to content

Commit

Permalink
Upcasting records to newer types as transformation in pipeline
Browse files Browse the repository at this point in the history
Usage:

  class Mapper < PipelineMapper
    def initialize(upcast_map: {})
      super(Pipeline.new(
        Transformation::Upcast.new(upcast_map),
      ))
    end
  end

  RubyEventStore::Client.new(
    mapper: Mapper.new(upcast_map: {
      'OldEventType' => lambda { |record|
        Record.new(
          event_type: 'NewEventType',
          data:        ...,
          metadata:    record.metadata,
          timestamp:   record.timestamp,
          valid_at:    record.valid_at,
          event_id:    record.event_id
        )
      }
    }),
    repository: ...
  )
  • Loading branch information
mostlyobvious committed Nov 3, 2020
1 parent ea47474 commit 8c3d9d9
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
1 change: 1 addition & 0 deletions ruby_event_store/lib/ruby_event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
require 'ruby_event_store/mappers/transformation/domain_event'
require 'ruby_event_store/mappers/transformation/encryption'
require 'ruby_event_store/mappers/transformation/event_class_remapper'
require 'ruby_event_store/mappers/transformation/upcast'
require 'ruby_event_store/mappers/transformation/proto_event'
require 'ruby_event_store/mappers/transformation/protobuf_encoder'
require 'ruby_event_store/mappers/transformation/protobuf_nested_struct_metadata'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module RubyEventStore
module Mappers
module Transformation
class Upcast
class RecordUpcaster
def initialize(upcast_map)
@upcast_map = upcast_map
end

def call(record)
identity = lambda { |r| r }
new_record = @upcast_map.fetch(record.event_type, identity)[record]
if new_record == record
record
else
call(new_record)
end
end
end

def initialize(upcast_map)
@record_upcaster = RecordUpcaster.new(upcast_map)
end

def dump(record)
record
end

def load(record)
@record_upcaster.call(record)
end
end
end
end
end
57 changes: 57 additions & 0 deletions ruby_event_store/spec/mappers/transformation/upcast_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
require 'spec_helper'

module RubyEventStore
module Mappers
module Transformation
RSpec.describe Upcast do
let(:time) { Time.now.utc }
let(:uuid) { SecureRandom.uuid }

let(:record_v1) do
Record.new(
event_id: uuid,
metadata: {some: 'meta'},
data: [{some: 'value'}],
event_type: 'record.v1',
timestamp: time,
valid_at: time,
)
end
let(:record_v2) do
Record.new(
event_id: uuid,
metadata: {some: 'meta'},
data: {as_hash: [{some: 'value'}]},
event_type: 'record.v2',
timestamp: time,
valid_at: time,
)
end
let(:upcast_map) do
{
'record.v1' => lambda do |r|
Record.new(
event_id: r.event_id,
metadata: r.metadata,
timestamp: r.timestamp,
valid_at: r.valid_at,
event_type: 'record.v2',
data: {as_hash: r.data}
)
end
}
end

specify "#dump" do
expect(Upcast.new(upcast_map).dump(record_v1)).to eq(record_v1)
expect(Upcast.new(upcast_map).dump(record_v2)).to eq(record_v2)
end

specify "#load" do
expect(Upcast.new(upcast_map).load(record_v1)).to eq(record_v2)
expect(Upcast.new(upcast_map).load(record_v2)).to eq(record_v2)
end
end
end
end
end

0 comments on commit 8c3d9d9

Please sign in to comment.