Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract platform-specifics of Fiber, Thread and EventLoop #6955

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
65 changes: 65 additions & 0 deletions src/crystal/event/event_loop_libevent.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
require "./event_libevent"

module Crystal::EventLoop
@@eb = Crystal::Event::Base.new
@@dns_base : Crystal::Event::DnsBase?

# Reinitializes the event loop after a fork.
def self.after_fork
@@eb.reinit
end

# Runs the event loop.
def self.resume
loop_fiber.resume
end

private def self.loop_fiber
@@loop_fiber ||= Fiber.new { @@eb.run_loop }
end

# Create a new resume event for a fiber.
def self.create_resume_event(fiber)
@@eb.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
data.as(Fiber).resume
end
end

# Creates a write event for a file descriptor.
def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered

@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Write)
io_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
io_ref.resume_write(timed_out: true)
end
end
end

# Creates a read event for a file descriptor.
def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered

@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Read)
io_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
io_ref.resume_read(timed_out: true)
end
end
end

private def self.dns_base
@@dns_base ||= @@eb.new_dns_base
end

def self.create_dns_request(nodename, servname, hints, data, &callback : LibEvent2::DnsGetAddrinfoCallback)
dns_base.getaddrinfo(nodename, servname, hints, data, &callback)
end
end
File renamed without changes.
74 changes: 22 additions & 52 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
@@ -1,60 +1,30 @@
require "./event"

module Crystal::EventLoop
@@eb = Crystal::Event::Base.new
@@dns_base : Crystal::Event::DnsBase?

def self.after_fork
@@eb.reinit
end

def self.resume
loop_fiber.resume
end

private def self.loop_fiber
@@loop_fiber ||= Fiber.new { @@eb.run_loop }
end
# Runs the event loop.
# def self.resume : Nil

def self.create_resume_event(fiber)
@@eb.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
data.as(Fiber).resume
end
end
# Reinitializes the event loop after a fork.
# def self.after_fork : Nil

def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
# Create a new resume event for a fiber.
# def self.create_resume_event(fiber : Fiber) : Crystal::Event

@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Write)
io_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
io_ref.resume_write(timed_out: true)
end
end
end
# Creates a write event for a file descriptor.
# def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false)

def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered

@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Read)
io_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
io_ref.resume_read(timed_out: true)
end
end
end
# Creates a read event for a file descriptor.
# def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false)
end

private def self.dns_base
@@dns_base ||= @@eb.new_dns_base
end
struct Crystal::Event
# Frees the event.
# def free : Nil

def self.create_dns_request(nodename, servname, hints, data, &callback : LibEvent2::DnsGetAddrinfoCallback)
dns_base.getaddrinfo(nodename, servname, hints, data, &callback)
end
# Adds a new timeout to this event.
# def add(time_span : Time::Span?) : Nil
end

{% if flag?(:unix) %}
require "./event/event_loop_libevent"
{% else %}
{% raise "event_loop not supported" %}
{% end %}
16 changes: 16 additions & 0 deletions src/crystal/system/fiber.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module Crystal::System::Fiber
# Allocates memory for a stack.
# def self.allocate_stack(stack_size : Int) : Void*

# Frees memory of a stack.
# def self.free_stack(stack : Void*, stack_size : Int) : Nil

# Determines location of the top of the main process fiber's stack.
# def self.main_fiber_stack(stack_bottom : Void*) : Void*
end

{% if flag?(:unix) %}
require "./unix/fiber"
{% else %}
{% raise "fiber not supported" %}
{% end %}
24 changes: 24 additions & 0 deletions src/crystal/system/unix/fiber.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
require "c/sys/mman"

module Crystal::System::Fiber
def self.allocate_stack(stack_size) : Void*
flags = LibC::MAP_PRIVATE | LibC::MAP_ANON
{% if flag?(:openbsd) && !flag?(:"openbsd6.2") %}
flags |= LibC::MAP_STACK
{% end %}

pointer = LibC.mmap(nil, stack_size, LibC::PROT_READ | LibC::PROT_WRITE, flags, -1, 0)
raise Errno.new("Cannot allocate new fiber stack") if pointer == LibC::MAP_FAILED

{% if flag?(:linux) %}
LibC.madvise(pointer, stack_size, LibC::MADV_NOHUGEPAGE)
{% end %}

LibC.mprotect(pointer, 4096, LibC::PROT_NONE)
pointer
end

def self.free_stack(stack : Void*, stack_size) : Nil
LibC.munmap(stack, stack_size)
end
end
2 changes: 1 addition & 1 deletion src/crystal/system/unix/getrandom.cr
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ module Crystal::System::Random
loop do
read_bytes = LibC.syscall(LibC::SYS_getrandom, buf, LibC::SizeT.new(buf.size), 0)
if read_bytes < 0 && (Errno.value == Errno::EINTR || Errno.value == Errno::EAGAIN)
Fiber.yield
::Fiber.yield
else
return read_bytes
end
Expand Down
1 change: 0 additions & 1 deletion src/fiber.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
require "c/sys/mman"
require "thread/linked_list"
require "./fiber/context"
require "./fiber/stack_pool"
Expand Down
23 changes: 4 additions & 19 deletions src/fiber/stack_pool.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "crystal/system/fiber"

class Fiber
# :nodoc:
class StackPool
Expand All @@ -12,7 +14,7 @@ class Fiber
def collect(count = lazy_size // 2)
count.times do
if stack = @deque.shift?
LibC.munmap(stack, STACK_SIZE)
Crystal::System::Fiber.free_stack(stack, STACK_SIZE)
else
return
end
Expand All @@ -21,7 +23,7 @@ class Fiber

# Removes a stack from the bottom of the pool, or allocates a new one.
def checkout
stack = @deque.pop? || allocate
stack = @deque.pop? || Crystal::System::Fiber.allocate_stack(STACK_SIZE)
{stack, stack + STACK_SIZE}
end

Expand All @@ -35,22 +37,5 @@ class Fiber
def lazy_size
@deque.size
end

private def allocate
flags = LibC::MAP_PRIVATE | LibC::MAP_ANON
{% if flag?(:openbsd) && !flag?(:"openbsd6.2") %}
flags |= LibC::MAP_STACK
{% end %}

pointer = LibC.mmap(nil, STACK_SIZE, LibC::PROT_READ | LibC::PROT_WRITE, flags, -1, 0)
raise Errno.new("Cannot allocate new fiber stack") if pointer == LibC::MAP_FAILED

{% if flag?(:linux) %}
LibC.madvise(pointer, STACK_SIZE, LibC::MADV_NOHUGEPAGE)
{% end %}

LibC.mprotect(pointer, 4096, LibC::PROT_NONE)
pointer
end
end
end
Loading