Skip to content

Commit

Permalink
Custom Queue(M) for Mailbox(M)
Browse files Browse the repository at this point in the history
Mailboxes no longer inherit from `Channel(M)` but use a tailored
solution for agent mailboxes that's simpler and with less overall
allocations (thanks to Syn::Core).
  • Loading branch information
ysbaddaden committed Sep 16, 2023
1 parent 247a7e2 commit f753478
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 38 deletions.
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,12 @@ properly reset themselves.

#### Mailbox

The `Earl::Mailbox(M)` module extends an agent with a `Channel(M)` along with
methods to `#send(M)` a message to an agent and to receive them (concurrency
safe).
The `Earl::Mailbox(M)` module extends an agent with a queue of `M` messages
along with methods to `#send(M)` a message to an agent and to receive them
(concurrency safe).

The module merely wraps a `Channel(M)` but proposes a standard structure for
agents to have an incoming mailbox of messages. All agents thus behave the
same, and we can assume that an agent that expects to receive messages has a
`#send(M)` method.
The goal is to provide a structured way for agents to communicate. No need to
spawn and share channels, you directly send a message to an agent for example.

An agent's mailbox will be closed when the agent is asked to stop. An agent can
simply loop over `#receive?` until it returns `nil`, without having to check for
Expand Down Expand Up @@ -305,7 +303,8 @@ I.e. they include `Earl::Mailbox(M)` or `Earl::Artist(M)`. They must also
override their `#reset` method to properly reset an agent.

Note that `Earl::Pool` will replace the workers' mailbox. All workers then share
a single `Channel(M)` for an exactly-once delivery of messages.
a single mailbox for an exactly-once delivery of messages (only one worker will
receive the message).

For example:

Expand Down
6 changes: 3 additions & 3 deletions SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ to check for `running?`.

- `#mailbox=`

Direct accessor to swap the underlying `Channel(M)` object. The mailbox won't
be closed anymore when the agent is stopped, since the mailbox is now
Direct accessor to swap the underlying `Earl::Queue(M)` object. The mailbox
won't be closed anymore when the agent is stopped, since the mailbox is now
considered to be shared.

Despite having direct accessors to the mailbox, external agents aren't
Expand Down Expand Up @@ -522,7 +522,7 @@ to stop.

If a pool is itself supervised by an [`Earl::Supervisor`](#earlsupervisor)
agent, and the pool crashes, the supervisor will recycle and restart it, with
the original channel kept open. Pending messages will be dispatched once the
the original mailbox kept open. Pending messages will be dispatched once the
pool workers are restarted.

- `.new(capacity)`
Expand Down
8 changes: 3 additions & 5 deletions src/logger/async_dispatcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Earl
module Logger
# Alternative to `Log::AsyncDispatcher` as an `Earl::Agent` that can be
# supervised.
#

# :nodoc:
class AsyncDispatcher
include ::Log::Dispatcher
Expand All @@ -14,7 +14,7 @@ module Earl
def initialize
# never close the mailbox: logs sent while the program ends could raise
# an exception because the mailbox is closed!
@mailbox_close_on_stop = false
@mailbox.close_on_stop = false
end

def finalize
Expand All @@ -39,9 +39,7 @@ module Earl

def terminate : Nil
# wait until all log messages have been processed before returning
queue = @mailbox.@queue.not_nil!

until queue.empty?
until @mailbox.empty?
sleep(0.seconds)
end
end
Expand Down
24 changes: 15 additions & 9 deletions src/mailbox.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./agent"
require "./queue"

module Earl
# Extends an agent with a mailbox that receives messages of type `M`.
Expand All @@ -7,37 +8,42 @@ module Earl
# take messages from the mailbox —undocumented because they're `protected`.
module Mailbox(M)
macro included
@mailbox = Channel(M).new(10)
@mailbox_close_on_stop = true
@mailbox = Earl::Queue(M).new
end

# Replaces the mailbox. The mailbox won't be closed automatically when the
# agent is asked to stop.
def mailbox=(@mailbox : Channel(M)) : Channel(M)
@mailbox_close_on_stop = false
def mailbox=(@mailbox : Queue(M)) : Queue(M)
@mailbox.close_on_stop = false
@mailbox
end

# Send a message to the `Agent`. Raises if the mailbox is closed.
# Sends a message to this `Agent`. Raises `ClosedError` if the mailbox is closed.
@[AlwaysInline]
def send(message : M) : Nil
raise ClosedError.new if @mailbox.closed?
@mailbox.send(message)
end

# Takes a previously received message. Raises if the mailbox is closed.
# Takes a previously received message. Raises `ClosedError` if the mailbox is closed.
@[AlwaysInline]
protected def receive : M
@mailbox.receive? || raise ClosedError.new
@mailbox.receive
end

# Takes a previously received message. Returns `nil` if the mailbox is closed.
@[AlwaysInline]
protected def receive? : M?
@mailbox.receive?
end

# :nodoc:
def stop : Nil
@mailbox.close if @mailbox_close_on_stop
super
@mailbox.close if @mailbox.close_on_stop?
end

# protected def reset_mailbox : Nil
# @mailbox = Queue(M).new(10)
# end
end
end
25 changes: 12 additions & 13 deletions src/pool.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "syn/core/mutex"
require "syn/core/wait_group"
require "./artist"

module Earl
Expand All @@ -9,11 +10,11 @@ module Earl
#
# ### Workers
#
# Crashed and unexpectedly stopped workers will be recycled and restarted,
# until the pool is asked to stop.
# Workers are permanent: crashed and unexpectedly stopped workers will be
# recycled and restarted until the pool itself is told to stop.
#
# Worker agents can return as soon as possible when asked to stop, or keep
# processing their mailbox until its empty.
# processing their mailbox until there is nothing left to do (your choice).
#
# ### Mailbox
#
Expand All @@ -25,14 +26,14 @@ module Earl
def initialize(@capacity : Int32)
@workers = Array(A).new(@capacity)
@mutex = Syn::Core::Mutex.new(:unchecked)
@done = Channel(Nil).new
@group = Syn::Core::WaitGroup.new(1)
end

# Spawns workers in their dedicated `Fiber`. Blocks until all workers have
# stopped.
def call
@capacity.times do
spawn do
::spawn do
agent = A.new
@mutex.synchronize { @workers << agent }

Expand All @@ -44,11 +45,7 @@ module Earl
end
end

@done.receive?

until @workers.empty?
Fiber.yield
end
@group.wait
end

def call(message : M)
Expand Down Expand Up @@ -76,10 +73,12 @@ module Earl
@workers.each do |agent|
agent.stop rescue nil
end
@group.done
end

unless @done.closed?
@done.close
end
def recycle
@workers.clear
@group = Syn::Core::WaitGroup.new(1)
end
end
end
93 changes: 93 additions & 0 deletions src/queue.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
require "syn/core/mutex"
require "syn/core/condition_variable"
require "./errors"

module Earl
# Earl::Queue is a Channel-like object with a simpler implementation, that
# leverages Syn::Core structs, specifically targetted for Earl mailbox
# requirements.
#
# It's obviously very limited compared to Channel(T). It doesn't support
# `select`, only supports buffered queue (no sync channel with zero capacity),
# but those features aren't needed by Earl mailboxes.

# :nodoc:
class Queue(M)
property? close_on_stop : Bool = true
property? closed : Bool = false

def initialize(@backlog = 128)
{% if M == Nil %}
{% raise "Can't create an Earl::Mailbox(M) where M is Nil (or nilable)" %}
{% elsif M.union? && M.union_types.any? { |m| m == Nil } %}
{% raise "Can't create an Earl::Mailbox(M) where M is nilable" %}
{% end %}

@deque = Deque(M).new(@backlog)
@mutex = Syn::Core::Mutex.new
@readers = Syn::Core::ConditionVariable.new
@writers = Syn::Core::ConditionVariable.new
end

def send(message : M) : Nil
@mutex.synchronize do
raise ClosedError.new if @closed

until @deque.size < @backlog
@writers.wait(pointerof(@mutex))
raise ClosedError.new if @closed
end

@deque.push(message)
@readers.signal
end
end

@[AlwaysInline]
def receive : M
do_receive { raise ClosedError.new }
end

@[AlwaysInline]
def receive? : M?
do_receive { return nil }
end

private def do_receive(&) : M
@mutex.synchronize do
loop do
if message = @deque.shift?
@writers.signal
return message
end

yield if @closed

@readers.wait(pointerof(@mutex))
end
end
end

def empty? : Bool
@mutex.synchronize { @deque.empty? }
end

# def lazy_empty? : Bool
# @deque.empty?
# end

def close : Nil
return if @closed

@mutex.synchronize do
@closed = true
@readers.broadcast
@writers.broadcast
end
end

# def reset : Nil
# @closed = false
# end
end
end

0 comments on commit f753478

Please sign in to comment.