Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Queue(M) for Mailbox(M) #19

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,12 @@ properly reset themselves.

#### Mailbox

The `Earl::Mailbox(M)` module extends an agent with a `Channel(M)` along with
methods to `#send(M)` a message to an agent and to receive them (concurrency
safe).
The `Earl::Mailbox(M)` module extends an agent with a queue of `M` messages
along with methods to `#send(M)` a message to an agent and to receive them
(concurrency safe).

The module merely wraps a `Channel(M)` but proposes a standard structure for
agents to have an incoming mailbox of messages. All agents thus behave the
same, and we can assume that an agent that expects to receive messages has a
`#send(M)` method.
The goal is to provide a structured way for agents to communicate. No need to
spawn and share channels, you directly send a message to an agent for example.

An agent's mailbox will be closed when the agent is asked to stop. An agent can
simply loop over `#receive?` until it returns `nil`, without having to check for
Expand Down Expand Up @@ -305,7 +303,8 @@ I.e. they include `Earl::Mailbox(M)` or `Earl::Artist(M)`. They must also
override their `#reset` method to properly reset an agent.

Note that `Earl::Pool` will replace the workers' mailbox. All workers then share
a single `Channel(M)` for an exactly-once delivery of messages.
a single mailbox for an exactly-once delivery of messages (only one worker will
receive the message).

For example:

Expand Down
6 changes: 3 additions & 3 deletions SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ to check for `running?`.

- `#mailbox=`

Direct accessor to swap the underlying `Channel(M)` object. The mailbox won't
be closed anymore when the agent is stopped, since the mailbox is now
Direct accessor to swap the underlying `Earl::Queue(M)` object. The mailbox
won't be closed anymore when the agent is stopped, since the mailbox is now
considered to be shared.

Despite having direct accessors to the mailbox, external agents aren't
Expand Down Expand Up @@ -522,7 +522,7 @@ to stop.

If a pool is itself supervised by an [`Earl::Supervisor`](#earlsupervisor)
agent, and the pool crashes, the supervisor will recycle and restart it, with
the original channel kept open. Pending messages will be dispatched once the
the original mailbox kept open. Pending messages will be dispatched once the
pool workers are restarted.

- `.new(capacity)`
Expand Down
8 changes: 3 additions & 5 deletions src/logger/async_dispatcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Earl
module Logger
# Alternative to `Log::AsyncDispatcher` as an `Earl::Agent` that can be
# supervised.
#

# :nodoc:
class AsyncDispatcher
include ::Log::Dispatcher
Expand All @@ -14,7 +14,7 @@ module Earl
def initialize
# never close the mailbox: logs sent while the program ends could raise
# an exception because the mailbox is closed!
@mailbox_close_on_stop = false
@mailbox.close_on_stop = false
end

def finalize
Expand All @@ -39,9 +39,7 @@ module Earl

def terminate : Nil
# wait until all log messages have been processed before returning
queue = @mailbox.@queue.not_nil!

until queue.empty?
until @mailbox.empty?
sleep(0.seconds)
end
end
Expand Down
24 changes: 15 additions & 9 deletions src/mailbox.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./agent"
require "./queue"

module Earl
# Extends an agent with a mailbox that receives messages of type `M`.
Expand All @@ -7,37 +8,42 @@ module Earl
# take messages from the mailbox —undocumented because they're `protected`.
module Mailbox(M)
macro included
@mailbox = Channel(M).new(10)
@mailbox_close_on_stop = true
@mailbox = Earl::Queue(M).new
end

# Replaces the mailbox. The mailbox won't be closed automatically when the
# agent is asked to stop.
def mailbox=(@mailbox : Channel(M)) : Channel(M)
@mailbox_close_on_stop = false
def mailbox=(@mailbox : Queue(M)) : Queue(M)
@mailbox.close_on_stop = false
@mailbox
end

# Send a message to the `Agent`. Raises if the mailbox is closed.
# Sends a message to this `Agent`. Raises `ClosedError` if the mailbox is closed.
@[AlwaysInline]
def send(message : M) : Nil
raise ClosedError.new if @mailbox.closed?
@mailbox.send(message)
end

# Takes a previously received message. Raises if the mailbox is closed.
# Takes a previously received message. Raises `ClosedError` if the mailbox is closed.
@[AlwaysInline]
protected def receive : M
@mailbox.receive? || raise ClosedError.new
@mailbox.receive
end

# Takes a previously received message. Returns `nil` if the mailbox is closed.
@[AlwaysInline]
protected def receive? : M?
@mailbox.receive?
end

# :nodoc:
def stop : Nil
@mailbox.close if @mailbox_close_on_stop
super
@mailbox.close if @mailbox.close_on_stop?
end

# protected def reset_mailbox : Nil
# @mailbox = Queue(M).new(10)
# end
end
end
6 changes: 3 additions & 3 deletions src/pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ module Earl
#
# ### Workers
#
# Crashed and unexpectedly stopped workers will be recycled and restarted,
# until the pool is asked to stop.
# Workers are permanent: crashed and unexpectedly stopped workers will be
# recycled and restarted until the pool itself is told to stop.
#
# Worker agents can return as soon as possible when asked to stop, or keep
# processing their mailbox until its empty.
# processing their mailbox until there is nothing left to do (your choice).
#
# ### Mailbox
#
Expand Down
93 changes: 93 additions & 0 deletions src/queue.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
require "syn/core/mutex"
require "syn/core/condition_variable"
require "./errors"

module Earl
# Earl::Queue is a Channel-like object with a simpler implementation, that
# leverages Syn::Core structs, specifically targetted for Earl mailbox
# requirements.
#
# It's obviously very limited compared to Channel(T). It doesn't support
# `select`, only supports buffered queue (no sync channel with zero capacity),
# but those features aren't needed by Earl mailboxes.

# :nodoc:
class Queue(M)
property? close_on_stop : Bool = true
property? closed : Bool = false

def initialize(@backlog = 128)
{% if M == Nil %}
{% raise "Can't create an Earl::Mailbox(M) where M is Nil (or nilable)" %}
{% elsif M.union? && M.union_types.any? { |m| m == Nil } %}
{% raise "Can't create an Earl::Mailbox(M) where M is nilable" %}
{% end %}

@deque = Deque(M).new(@backlog)
@mutex = Syn::Core::Mutex.new
@readers = Syn::Core::ConditionVariable.new
@writers = Syn::Core::ConditionVariable.new
end

def send(message : M) : Nil
@mutex.synchronize do
raise ClosedError.new if @closed

until @deque.size < @backlog
@writers.wait(pointerof(@mutex))
raise ClosedError.new if @closed
end

@deque.push(message)
@readers.signal
end
end

@[AlwaysInline]
def receive : M
do_receive { raise ClosedError.new }
end

@[AlwaysInline]
def receive? : M?
do_receive { return nil }
end

private def do_receive(&) : M
@mutex.synchronize do
loop do
if message = @deque.shift?
@writers.signal
return message
end

yield if @closed

@readers.wait(pointerof(@mutex))
end
end
end

def empty? : Bool
@mutex.synchronize { @deque.empty? }
end

# def lazy_empty? : Bool
# @deque.empty?
# end

def close : Nil
return if @closed

@mutex.synchronize do
@closed = true
@readers.broadcast
@writers.broadcast
end
end

# def reset : Nil
# @closed = false
# end
end
end