Skip to content

Commit

Permalink
Merge pull request #126 from cookpad/r/dispatch-many
Browse files Browse the repository at this point in the history
Add support to deliver messages in batches
  • Loading branch information
robertomiranda authored Apr 30, 2024
2 parents 21f2106 + 9c46320 commit 2c66bb8
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 0 deletions.
4 changes: 4 additions & 0 deletions lib/streamy/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ def dispatch
Streamy.message_bus.deliver(**message_params)
end

def self.dispatch_many(events)
Streamy.message_bus.deliver_many(events.map(&:to_message))
end

private

attr_reader :event
Expand Down
4 changes: 4 additions & 0 deletions lib/streamy/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def self.publish(**args)
new(**args).publish
end

def self.publish_many(events)
Streamy.dispatcher.dispatch_many(events)
end

priority :standard

def publish
Expand Down
5 changes: 5 additions & 0 deletions lib/streamy/message_buses/kafka_message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def deliver(key:, topic:, payload:, priority:)
end
end

def deliver_many(messages)
messages = messages.map { |message| message.except(:priority) }
sync_producer.produce_many_sync(messages)
end

def shutdown
async_producer.close if async_producer?
sync_producers.map(&:close)
Expand Down
4 changes: 4 additions & 0 deletions lib/streamy/message_buses/message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ class MessageBus
def deliver(key:, topic:, payload:, priority:)
# NOOP: Implement delivery logic
end

def deliver_many(messages)
# NOOP: Implement delivery logic
end
end
end
end
5 changes: 5 additions & 0 deletions lib/streamy/test_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@ def dispatch
events << event_params
messages << message_params
end

def self.dispatch_many(events)
self.events += events.map(&:to_params)
self.messages += events.map(&:to_message)
end
end
end

0 comments on commit 2c66bb8

Please sign in to comment.