Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventLoop::Socket module #14643

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/crystal/system/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ abstract class Crystal::EventLoop
end
end

abstract class Crystal::EventLoop
# The socket module is empty by default and filled with abstract defs when
# crystal/system/socket.cr is required.
module Socket
end

include Socket
end

{% if flag?(:wasi) %}
require "./wasi/event_loop"
{% elsif flag?(:unix) %}
Expand Down
66 changes: 66 additions & 0 deletions src/crystal/system/event_loop/socket.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# This file is only required when sockets are used (`require "./event_loop/socket"` in `src/crystal/system/socket.cr`)
#
# It fills `Crystal::EventLoop::Socket` with abstract defs.

abstract class Crystal::EventLoop
module Socket
# Reads at least one byte from the socket into *slice*.
#
# Blocks the current fiber if no data is available for reading, continuing
# when available. Otherwise returns immediately.
#
# Returns the number of bytes read (up to `slice.size`).
# Returns 0 when the socket is closed and no data available.
#
# Use `#send_to` for sending a message to a specific target address.
abstract def read(socket : ::Socket, slice : Bytes) : Int32

# Writes at least one byte from *slice* to the socket.
#
# Blocks the current fiber if the socket is not ready for writing,
# continuing when ready. Otherwise returns immediately.
#
# Returns the number of bytes written (up to `slice.size`).
#
# Use `#receive_from` for capturing the source address of a message.
abstract def write(socket : ::Socket, slice : Bytes) : Int32

# Accepts an incoming TCP connection on the socket.
#
# Blocks the current fiber if no connection is waiting, continuing when one
# becomes available. Otherwise returns immediately.
#
# Returns a handle to the socket for the new connection.
abstract def accept(socket : ::Socket) : ::Socket::Handle?

# Opens a connection on *socket* to the target *address*.
#
# Blocks the current fiber and continues when the connection is established.
#
# Returns `IO::Error` in case of an error. The caller is responsible for
# raising it as an exception if necessary.
abstract def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span?) : IO::Error?

# Sends at least one byte from *slice* to the socket with a target address
# *address*.
#
# Blocks the current fiber if the socket is not ready for writing,
# continuing when ready. Otherwise returns immediately.
#
# Returns the number of bytes sent (up to `slice.size`).
abstract def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32

# Receives at least one byte from the socket into *slice*, capturing the
# source address.
#
# Blocks the current fiber if no data is available for reading, continuing
# when available. Otherwise returns immediately.
#
# Returns a tuple containing the number of bytes received (up to `slice.size`)
# and the source address.
abstract def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)

# Closes the socket.
abstract def close(socket : ::Socket) : Nil
end
end
30 changes: 24 additions & 6 deletions src/crystal/system/socket.cr
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
require "./event_loop/socket"

module Crystal::System::Socket
# Creates a file descriptor / socket handle
# private def create_handle(family, type, protocol, blocking) : Handle

# Initializes a file descriptor / socket handle for use with Crystal Socket
# private def initialize_handle(fd)

# private def system_connect(addr, timeout = nil)
private def system_connect(addr, timeout = nil)
event_loop.connect(self, addr, timeout)
end

# Tries to bind the socket to a local address.
# Yields an `Socket::BindError` if the binding failed.
# private def system_bind(addr, addrstr)

# private def system_listen(backlog)

# private def system_accept
private def system_accept
event_loop.accept(self)
end

# private def system_send_to(bytes : Bytes, addr : ::Socket::Address)
private def system_send_to(bytes : Bytes, addr : ::Socket::Address)
event_loop.send_to(self, bytes, addr)
end

# private def system_receive(bytes)
private def system_receive_from(bytes : Bytes) : Tuple(Int32, ::Socket::Address)
event_loop.receive_from(self, bytes)
end

# private def system_close_read

Expand Down Expand Up @@ -69,12 +79,20 @@ module Crystal::System::Socket

# def self.fcntl(fd, cmd, arg = 0)

# private def system_read(slice : Bytes) : Int32
private def system_read(slice : Bytes) : Int32
event_loop.read(self, slice)
end

# private def system_write(slice : Bytes) : Int32
private def system_write(slice : Bytes) : Int32
event_loop.write(self, slice)
end

# private def system_close

private def event_loop : Crystal::EventLoop::Socket
Crystal::EventLoop.current
end

# IPSocket:

# private def system_local_address
Expand Down
77 changes: 77 additions & 0 deletions src/crystal/system/unix/event_loop_libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,81 @@
end
end
end

def read(socket : ::Socket, slice : Bytes) : Int32
socket.evented_read("Error reading socket") do
LibC.recv(socket.fd, slice, slice.size, 0).to_i32
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
socket.evented_write("Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0).to_i32
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*)
# initialize sockaddr with the initialized family of the socket
copy = sockaddr.value
copy.sa_family = socket.family
sockaddr.value = copy

addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage))

bytes_read = socket.evented_read("Error receiving datagram") do
LibC.recvfrom(socket.fd, slice, slice.size, 0, sockaddr, pointerof(addrlen))
end

{bytes_read, ::Socket::Address.from(sockaddr, addrlen)}
end

def send_to(socket : ::Socket, slice : Bytes, addr : ::Socket::Address) : Int32

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 13.0.0

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 14.0.0

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 15.0.6

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 16.0.3

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 17.0.6

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 18.1.4

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing
bytes_sent = LibC.sendto(socket.fd, slice.to_unsafe.as(Void*), slice.size, 0, addr, addr.size)
raise ::Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1
# to_i32 is fine because string/slice sizes are an Int32
bytes_sent.to_i32
end

def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span?) : IO::Error?
loop do
if LibC.connect(socket.fd, address, address.size) == 0
return
end
case Errno.value
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
socket.wait_writable(timeout: timeout) do
return IO::TimeoutError.new("connect timed out")
end
else
return ::Socket::ConnectError.from_errno("connect")
end
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
loop do
client_fd = LibC.accept(socket.fd, nil, nil)
if client_fd == -1
if socket.closed?
return
elsif Errno.value == Errno::EAGAIN
socket.wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Accept timed out")
end
return if socket.closed?
else
raise ::Socket::Error.from_errno("accept")
end
else
return client_fd
end
end
end

def close(socket : ::Socket) : Nil
socket.evented_close
end
end
80 changes: 1 addition & 79 deletions src/crystal/system/unix/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,6 @@ module Crystal::System::Socket
{% end %}
end

private def system_connect(addr, timeout = nil)
timeout = timeout.seconds unless timeout.is_a? ::Time::Span | Nil
loop do
if LibC.connect(fd, addr, addr.size) == 0
return
end
case Errno.value
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
wait_writable(timeout: timeout) do
return IO::TimeoutError.new("connect timed out")
end
else
return ::Socket::ConnectError.from_errno("connect")
end
end
end

# Tries to bind the socket to a local address.
# Yields an `Socket::BindError` if the binding failed.
private def system_bind(addr, addrstr, &)
Expand All @@ -59,53 +40,6 @@ module Crystal::System::Socket
end
end

private def system_accept
loop do
client_fd = LibC.accept(fd, nil, nil)
if client_fd == -1
if closed?
return
elsif Errno.value == Errno::EAGAIN
wait_acceptable
return if closed?
else
raise ::Socket::Error.from_errno("accept")
end
else
return client_fd
end
end
end

private def wait_acceptable
wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Accept timed out")
end
end

private def system_send_to(bytes : Bytes, addr : ::Socket::Address)
bytes_sent = LibC.sendto(fd, bytes.to_unsafe.as(Void*), bytes.size, 0, addr, addr.size)
raise ::Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1
# to_i32 is fine because string/slice sizes are an Int32
bytes_sent.to_i32
end

private def system_receive(bytes)
sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*)
# initialize sockaddr with the initialized family of the socket
copy = sockaddr.value
copy.sa_family = family
sockaddr.value = copy

addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage))

bytes_read = evented_read("Error receiving datagram") do
LibC.recvfrom(fd, bytes, bytes.size, 0, sockaddr, pointerof(addrlen))
end

{bytes_read, ::Socket::Address.from(sockaddr, addrlen)}
end

private def system_close_read
if LibC.shutdown(fd, LibC::SHUT_RD) != 0
raise ::Socket::Error.from_errno("shutdown read")
Expand Down Expand Up @@ -248,23 +182,11 @@ module Crystal::System::Socket
LibC.isatty(fd) == 1
end

private def system_read(slice : Bytes) : Int32
evented_read("Error reading socket") do
LibC.recv(fd, slice, slice.size, 0).to_i32
end
end

private def system_write(slice : Bytes) : Int32
evented_write("Error writing to socket") do
LibC.send(fd, slice, slice.size, 0)
end
end

private def system_close
# Perform libevent cleanup before LibC.close.
# Using a file descriptor after it has been closed is never defined and can
# always lead to undefined results. This is not specific to libevent.
evented_close
event_loop.close(self)

# Clear the @volatile_fd before actually closing it in order to
# reduce the chance of reading an outdated fd value
Expand Down
32 changes: 32 additions & 0 deletions src/crystal/system/wasi/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,38 @@ class Crystal::Wasi::EventLoop < Crystal::EventLoop
def create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::EventLoop::Event
raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_fd_read_event")
end

def read(socket : ::Socket, slice : Bytes) : Int32
socket.evented_read("Error reading socket") do
LibC.recv(socket.fd, slice, slice.size, 0).to_i32
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
socket.evented_write("Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0)
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
raise NotImplementedError.new "Crystal::Wasi::EventLoop#receive_from"
end

def send_to(socket : ::Socket, slice : Bytes, addr : ::Socket::Address) : Int32
raise NotImplementedError.new "Crystal::Wasi::EventLoop#send_to"
end

def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span | ::Nil) : IO::Error?
raise NotImplementedError.new "Crystal::Wasi::EventLoop#connect"
end

def accept(socket : ::Socket) : ::Socket::Handle?
raise NotImplementedError.new "Crystal::Wasi::EventLoop#accept"
end

def close(socket : ::Socket) : Nil
socket.evented_close
end
end

struct Crystal::Wasi::Event
Expand Down
Loading
Loading