An Event-Driven Architecture framework to build reactive apps.
Onyx::EDA is an Event-Driven Architecture framework. It allows to emit certain events and subscribe to them.
Currently the framework has these channels implemented:
- Memory channel
- Redis channel (working on Redis streams)
Onyx::EDA is a real-time events framework. It does not process events happend in the past and currently does not care about reliability in case of third-party service dependant channels (i.e. Redis).
👍 The framework is a great choice for reactive and/or distributed applications, effectively allowing to have multiple loosely-coupled components which do not directly interact with each other, but rely on events instead.
👎 However, Onyx::EDA is not a good choice for tasks requiring reliability, for example, background processing. If a Redis consumer dies during processing, the event is likely to not be processed. This behaviour may change in the future.
Add this to your application's shard.yml
:
dependencies:
onyx:
github: onyxframework/onyx
version: ~> 0.6.0
onyx-eda:
github: onyxframework/eda
version: ~> 0.4.0
This shard follows Semantic Versioning v2.0.0, so check releases and change the version
accordingly.
Note that until Crystal is officially released, this shard would be in beta state (
0.*.*
), with every minor release considered breaking. For example,0.1.0
→0.2.0
is breaking and0.1.0
→0.1.1
is not.
First of all, you must require channels you'd need:
require "onyx/eda/memory"
require "onyx/eda/redis"
Then define events to emit:
struct MyEvent
include Onyx::EDA::Event
getter foo
def initialize(@foo : String)
end
end
You must define a block which would be run on incoming event:
Onyx::EDA.memory.subscribe(MyEvent) do |event|
pp event.foo
end
Subscribing and emitting are asynchronous operations. You must then yield
the control with sleep
or Fiber.yield
to let notifications reach their subscriptions:
Onyx::EDA.memory.emit(MyEvent.new("bar"))
sleep(1)
Output, as expected:
bar
You can cancel a subscription as well:
sub = Onyx::EDA.memory.subscribe(MyEvent) do |event|
pp event.foo
end
sub.unsubscribe
You can filter incoming events and run the subscription block only if the event's getters match the filter:
# Would only put "bar"
Onyx::EDA.memory.subscribe(MyEvent, foo: "bar") do |event|
pp event.foo
end
Onyx::EDA.memory.emit(MyEvent.new("qux")) # Would not notify the subscription above
Onyx::EDA.memory.emit(MyEvent.new("bar")) # OK, condition is met
You can create an event consumption instead of a subscription. From docs:
Consumption differs from subscription in a way that only a single consuming subscription instance with certain consumer_id among all this channel subscribers would be notified about an event after it successfully acquires a lock. The lock implementation differs in channels.
In this code only one "bar"
will be put, because both subscriptions have "MyConsumer"
as the consumer ID:
sub1 = Onyx::EDA.memory.subscribe(MyEvent, "MyConsumer") do |event|
puts event.foo
end
sub2 = Onyx::EDA.memory.subscribe(MyEvent, "MyConsumer") do |event|
puts event.foo
end
Onyx::EDA.memory.emit(MyEvent.new("bar"))
The consuming works as expected with Redis channel as well. It relies on Redis streams. However, if a consumer crashes, then no other consumer with the same ID would try to process this event anymore (i.e. the behavior is unreliable). This may change in the future.
Note that you can not use event filters while consuming.
It is possible to await for a certain event to happen in a blocking manner:
# Will block the execution until the event is received
Onyx::EDA.memory.await(MyEvent) do |event|
pp event.foo
end
It is particularly useful in select
blocks:
select
when event = Onyx::EDA.memory.await(MyEvent)
pp event.foo
when Timer.new(30.seconds)
raise "Timeout!"
end
💡 See timer.cr for a timer shard.
You can use filters with awaiting, making it possible to wait for a specific event hapenning:
record MyEventHandled, parent_event_id : UUID do
include Onyx::EDA::Event
end
event = Onyx::EDA.redis.emit(MyEvent.new("bar"))
select
when event = Onyx::EDA.redis.await(MyEventHandled, parent_event_id: event.event_id)
puts "Handled"
when Timer.new(30.seconds)
raise "Timeout!"
end
You can include the Subscriber(T)
and Consumer(T)
modules into an object, turning it into an event (T
) subscriber or consumer. It must implement handle(event : T)
and be explicitly subscribed to a channel.
class Actor::Logger
include Onyx::EDA::Subscriber(Event::User::Registered)
include Onyx::EDA::Consumer(Event::Payment::Successfull)
# This method will be called in *all* Actor::Logger instances
def handle(event : Event::User::Registered)
log_into_terminal("New user with id #{event.id}")
end
# This method will be called in only *one* Actor::Logger instance
def handle(event : Event::Payment::Successfull)
send_email("admin@example.com", "New payment of $#{event.amount}")
end
end
actor = Actor::Logger.new
actor.subscribe(Onyx::EDA.memory) # Non-blocking method
actor.unsubscribe(Onyx::EDA.memory) # Can be unsubscribed as well
The documentation is available online at docs.onyxframework.org/eda.
There are multiple places to talk about Onyx:
This shard is maintained by me, Vlad Faust, a passionate developer with years of programming and product experience. I love creating Open-Source and I want to be able to work full-time on Open-Source projects.
I will do my best to answer your questions in the free communication channels above, but if you want prioritized support, then please consider becoming my patron. Your issues will be labeled with your patronage status, and if you have a sponsor tier, then you and your team be able to communicate with me privately in Twist. There are other perks to consider, so please, don't hesistate to check my Patreon page:
You could also help me a lot if you leave a star to this GitHub repository and spread the word about Crystal and Onyx! 📣
- Fork it ( https://github.com/onyxframework/eda/fork )
- Create your feature branch (git checkout -b my-new-feature)
- Commit your changes (git commit -am 'feat: some feature') using Angular style commits
- Push to the branch (git push origin my-new-feature)
- Create a new Pull Request
- Vlad Faust - creator and maintainer
This software is licensed under MIT License.