From 1609f43eafe460372ee13cb91ad9ac04b3e90ab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Wed, 12 May 2021 19:25:18 +0200 Subject: [PATCH 1/3] Extract system specifics from Socket --- src/crystal/system/socket.cr | 63 ++++++++ src/crystal/system/unix/socket.cr | 241 +++++++++++++++++++++++++++++ src/socket.cr | 245 ++++++------------------------ src/socket/tcp_server.cr | 4 +- src/socket/udp_socket.cr | 4 +- src/socket/unix_server.cr | 4 +- 6 files changed, 354 insertions(+), 207 deletions(-) create mode 100644 src/crystal/system/socket.cr create mode 100644 src/crystal/system/unix/socket.cr diff --git a/src/crystal/system/socket.cr b/src/crystal/system/socket.cr new file mode 100644 index 000000000000..fe9cf321633c --- /dev/null +++ b/src/crystal/system/socket.cr @@ -0,0 +1,63 @@ +module Crystal::System::Socket + # Creates a file descriptor / socket handle + # private def create_handle(family, type, protocol, blocking) : Handle + + # Initializes a file descriptor / socket handle for use with Crystal Socket + # private def initialize_handle(fd) + + # private def system_connect(addr, timeout = nil) + + # Tries to bind the socket to a local address. + # Yields an `Socket::BindError` if the binding failed. + # private def system_bind(addr, addrstr) + + # private def system_listen(backlog) + + # private def system_accept + + # private def system_send(bytes : Bytes) : Int32 + + # private def system_send_to(bytes : Bytes, addr : ::Socket::Address) + + # private def system_receive(bytes) + + # private def system_close_read + + # private def system_close_write + + # private def system_reuse_port? + + # private def system_reuse_port=(val : Bool) + + # private def system_linger + + # private def system_linger=(val) + + # private def system_getsockopt(fd, optname, optval, level = LibC::SOL_SOCKET) + + # private def system_setsockopt(fd, optname, optval, level = LibC::SOL_SOCKET) + + # private def system_blocking? + + # private def system_blocking=(value) + + # private def system_tty? + + # private def system_close_on_exec? + + # private def system_close_on_exec=(arg : Bool) + + # def self.fcntl(fd, cmd, arg = 0) + + # private def unbuffered_read(slice : Bytes) + + # private def unbuffered_write(slice : Bytes) + + # private def system_close +end + +{% if flag?(:unix) %} + require "./unix/socket" +{% else %} + {% raise "No Crystal::System::Socket implementation available" %} +{% end %} diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr new file mode 100644 index 000000000000..2982c857747c --- /dev/null +++ b/src/crystal/system/unix/socket.cr @@ -0,0 +1,241 @@ +require "c/netdb" +require "c/netinet/tcp" +require "c/sys/socket" +require "io/evented" + +module Crystal::System::Socket + include IO::Evented + + alias Handle = Int32 + + private def create_handle(family, type, protocol, blocking) : Handle + fd = LibC.socket(family, type, protocol) + raise ::Socket::Error.from_errno("Failed to create socket") if fd == -1 + fd + end + + private def initialize_handle(fd) + {% unless LibC.has_constant?(:SOCK_CLOEXEC) %} + # Forces opened sockets to be closed on `exec(2)`. Only for platforms that don't + # support `SOCK_CLOEXEC` (e.g., Darwin). + LibC.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) + {% end %} + end + + private def system_connect(addr, timeout = nil) + timeout = timeout.seconds unless timeout.is_a? ::Time::Span | Nil + loop do + if LibC.connect(fd, addr, addr.size) == 0 + return + end + case Errno.value + when Errno::EISCONN + return + when Errno::EINPROGRESS, Errno::EALREADY + wait_writable(timeout: timeout) do + return yield IO::TimeoutError.new("connect timed out") + end + else + return yield ::Socket::ConnectError.from_errno("connect") + end + end + end + + # Tries to bind the socket to a local address. + # Yields an `Socket::BindError` if the binding failed. + private def system_bind(addr, addrstr) + unless LibC.bind(fd, addr, addr.size) == 0 + yield ::Socket::BindError.from_errno("Could not bind to '#{addrstr}'") + end + end + + private def system_listen(backlog) + unless LibC.listen(fd, backlog) == 0 + yield ::Socket::Error.from_errno("Listen failed") + end + end + + private def system_accept + loop do + client_fd = LibC.accept(fd, nil, nil) + if client_fd == -1 + if closed? + return + elsif Errno.value == Errno::EAGAIN + wait_acceptable + return if closed? + else + raise ::Socket::Error.from_errno("accept") + end + else + return client_fd + end + end + end + + private def wait_acceptable + wait_readable(raise_if_closed: false) do + raise IO::TimeoutError.new("Accept timed out") + end + end + + private def system_send(bytes : Bytes) : Int32 + evented_send(bytes, "Error sending datagram") do |slice| + LibC.send(fd, slice.to_unsafe.as(Void*), slice.size, 0) + end + end + + private def system_send_to(bytes : Bytes, addr : ::Socket::Address) + bytes_sent = LibC.sendto(fd, bytes.to_unsafe.as(Void*), bytes.size, 0, addr, addr.size) + raise ::Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1 + # to_i32 is fine because string/slice sizes are an Int32 + bytes_sent.to_i32 + end + + private def system_receive(bytes) + sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*) + # initialize sockaddr with the initialized family of the socket + copy = sockaddr.value + copy.sa_family = family + sockaddr.value = copy + + addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage)) + + bytes_read = evented_read(bytes, "Error receiving datagram") do |slice| + LibC.recvfrom(fd, slice, slice.size, 0, sockaddr, pointerof(addrlen)) + end + + {bytes_read, sockaddr, addrlen} + end + + private def system_close_read + if LibC.shutdown(fd, LibC::SHUT_RD) != 0 + raise ::Socket::Error.from_errno("shutdown read") + end + end + + private def system_close_write + if LibC.shutdown(fd, LibC::SHUT_WR) != 0 + raise ::Socket::Error.from_errno("shutdown write") + end + end + + private def system_reuse_port? + system_getsockopt(fd, LibC::SO_REUSEPORT, 0) do |value| + return value != 0 + end + + if Errno.value == Errno::ENOPROTOOPT + return false + else + raise ::Socket::Error.from_errno("getsockopt") + end + end + + private def system_reuse_port=(val : Bool) + setsockopt_bool LibC::SO_REUSEPORT, val + end + + private def system_linger + v = LibC::Linger.new + ret = getsockopt LibC::SO_LINGER, v + ret.l_onoff == 0 ? nil : ret.l_linger + end + + private def system_linger=(val) + v = LibC::Linger.new + case val + when Int + v.l_onoff = 1 + v.l_linger = val + when nil + v.l_onoff = 0 + end + + setsockopt LibC::SO_LINGER, v + val + end + + private def system_getsockopt(fd, optname, optval, level = LibC::SOL_SOCKET) + optsize = LibC::SocklenT.new(sizeof(typeof(optval))) + ret = LibC.getsockopt(fd, level, optname, pointerof(optval), pointerof(optsize)) + yield optval if ret == 0 + ret + end + + private def system_setsockopt(fd, optname, optval, level = LibC::SOL_SOCKET) + optsize = LibC::SocklenT.new(sizeof(typeof(optval))) + + ret = LibC.setsockopt(fd, level, optname, pointerof(optval), optsize) + raise ::Socket::Error.from_errno("setsockopt") if ret == -1 + ret + end + + private def system_blocking? + fcntl(LibC::F_GETFL) & LibC::O_NONBLOCK == 0 + end + + private def system_blocking=(value) + flags = fcntl(LibC::F_GETFL) + if value + flags &= ~LibC::O_NONBLOCK + else + flags |= LibC::O_NONBLOCK + end + fcntl(LibC::F_SETFL, flags) + end + + private def system_close_on_exec? + flags = fcntl(LibC::F_GETFD) + (flags & LibC::FD_CLOEXEC) == LibC::FD_CLOEXEC + end + + private def system_close_on_exec=(arg : Bool) + fcntl(LibC::F_SETFD, arg ? LibC::FD_CLOEXEC : 0) + arg + end + + def self.fcntl(fd, cmd, arg = 0) + r = LibC.fcntl fd, cmd, arg + raise ::Socket::Error.from_errno("fcntl() failed") if r == -1 + r + end + + private def system_tty? + LibC.isatty(fd) == 1 + end + + private def unbuffered_read(slice : Bytes) + evented_read(slice, "Error reading socket") do + LibC.recv(fd, slice, slice.size, 0).to_i32 + end + end + + private def unbuffered_write(slice : Bytes) + evented_write(slice, "Error writing to socket") do |slice| + LibC.send(fd, slice, slice.size, 0) + end + end + + private def system_close + # Perform libevent cleanup before LibC.close. + # Using a file descriptor after it has been closed is never defined and can + # always lead to undefined results. This is not specific to libevent. + evented_close + + # Clear the @volatile_fd before actually closing it in order to + # reduce the chance of reading an outdated fd value + fd = @volatile_fd.swap(-1) + + ret = LibC.close(fd) + + if ret != 0 + case Errno.value + when Errno::EINTR, Errno::EINPROGRESS + # ignore + else + raise ::Socket::Error.from_errno("Error closing socket") + end + end + end +end diff --git a/src/socket.cr b/src/socket.cr index 3bbb38910cda..a3e4e710123f 100644 --- a/src/socket.cr +++ b/src/socket.cr @@ -1,12 +1,8 @@ -require "c/netdb" -require "c/netinet/in" -require "c/netinet/tcp" -require "c/sys/socket" -require "io/evented" +require "crystal/system/socket" class Socket < IO include IO::Buffered - include IO::Evented + include Crystal::System::Socket enum Type STREAM = LibC::SOCK_STREAM @@ -18,9 +14,13 @@ class Socket < IO # :nodoc: SOMAXCONN = 128 - @volatile_fd : Atomic(Int32) + @volatile_fd : Atomic(Handle) - def fd : Int32 + # Returns the handle associated with this socket from the operating system. + # + # * on POSIX platforms, this is a file descriptor (`Int32`) + # * on Windows, this is a SOCKET handle (`LibC::SOCKET`) + def fd @volatile_fd.get end @@ -48,24 +48,19 @@ class Socket < IO new(Family::UNIX, type, blocking: blocking) end - def initialize(@family, @type, @protocol = Protocol::IP, blocking = false) - @closed = false - fd = LibC.socket(family, type, protocol) - raise Socket::Error.from_errno("Failed to create socket") if fd == -1 - init_close_on_exec(fd) - @volatile_fd = Atomic.new(fd) + def initialize(family : Family, type : Type, protocol : Protocol = Protocol::IP, blocking = false) + # This method is `#initialize` instead of `.new` because it is used as super + # constructor from subclasses. - self.sync = true - unless blocking - self.blocking = false - end + fd = create_handle(family, type, protocol, blocking) + initialize(fd, family, type, protocol, blocking) end - # Creates a Socket from an existing socket file descriptor. - def initialize(fd : Int32, @family, @type, @protocol = Protocol::IP, blocking = false) + # Creates a Socket from an existing socket file descriptor / handle. + def initialize(fd, @family : Family, @type : Type, @protocol : Protocol = Protocol::IP, blocking = false) @volatile_fd = Atomic.new(fd) @closed = false - init_close_on_exec(fd) + initialize_handle(fd) self.sync = true unless blocking @@ -73,14 +68,6 @@ class Socket < IO end end - # Forces opened sockets to be closed on `exec(2)`. Only for platforms that don't - # support `SOCK_CLOEXEC` (e.g., Darwin). - protected def init_close_on_exec(fd : Int32) - {% unless LibC.has_constant?(:SOCK_CLOEXEC) %} - LibC.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) - {% end %} - end - # Connects the socket to a remote host:port. # # ``` @@ -109,23 +96,8 @@ class Socket < IO # Tries to connect to a remote address. Yields an `IO::TimeoutError` or an # `Socket::ConnectError` error if the connection failed. - def connect(addr, timeout = nil) - timeout = timeout.seconds unless timeout.is_a? Time::Span | Nil - loop do - if LibC.connect(fd, addr, addr.size) == 0 - return - end - case Errno.value - when Errno::EISCONN - return - when Errno::EINPROGRESS, Errno::EALREADY - wait_writable(timeout: timeout) do - return yield IO::TimeoutError.new("connect timed out") - end - else - return yield Socket::ConnectError.from_errno("connect") - end - end + def connect(addr, timeout = nil, &) + system_connect(addr, timeout) { |error| yield error } end # Binds the socket to a local address. @@ -138,7 +110,7 @@ class Socket < IO # ``` def bind(host : String, port : Int) Addrinfo.resolve(host, port, @family, @type, @protocol) do |addrinfo| - bind(addrinfo, "#{host}:#{port}") { |errno| errno } + system_bind(addrinfo, "#{host}:#{port}") { |errno| errno } end end @@ -152,7 +124,7 @@ class Socket < IO # ``` def bind(port : Int) Addrinfo.resolve("::", port, @family, @type, @protocol) do |addrinfo| - bind(addrinfo, "::#{port}") { |errno| errno } + system_bind(addrinfo, "::#{port}") { |errno| errno } end end @@ -165,15 +137,7 @@ class Socket < IO # sock.bind Socket::IPAddress.new("192.168.1.25", 80) # ``` def bind(addr : Socket::Address) - bind(addr, addr.to_s) { |errno| raise errno } - end - - # Tries to bind the socket to a local address. - # Yields an `Socket::BindError` if the binding failed. - private def bind(addr, addrstr) - unless LibC.bind(fd, addr, addr.size) == 0 - yield BindError.from_errno("Could not bind to '#{addrstr}'") - end + system_bind(addr, addr.to_s) { |errno| raise errno } end # Tells the previously bound socket to listen for incoming connections. @@ -184,9 +148,7 @@ class Socket < IO # Tries to listen for connections on the previously bound socket. # Yields an `Socket::Error` on failure. def listen(backlog : Int = SOMAXCONN) - unless LibC.listen(fd, backlog) == 0 - yield Socket::Error.from_errno("Listen failed") - end + system_listen(backlog) { |err| yield err } end # Accepts an incoming connection. @@ -221,37 +183,13 @@ class Socket < IO # end # ``` def accept? - if client_fd = accept_impl + if client_fd = system_accept sock = Socket.new(client_fd, family, type, protocol, blocking) sock.sync = sync? sock end end - protected def accept_impl - loop do - client_fd = LibC.accept(fd, nil, nil) - if client_fd == -1 - if closed? - return - elsif Errno.value == Errno::EAGAIN - wait_acceptable - return if closed? - else - raise Socket::Error.from_errno("accept") - end - else - return client_fd - end - end - end - - private def wait_acceptable - wait_readable(raise_if_closed: false) do - raise TimeoutError.new("Accept timed out") - end - end - # Sends a message to a previously connected remote address. # # ``` @@ -266,9 +204,7 @@ class Socket < IO # sock.send(Bytes[0]) # ``` def send(message) : Int32 - evented_send(message.to_slice, "Error sending datagram") do |slice| - LibC.send(fd, slice.to_unsafe.as(Void*), slice.size, 0) - end + system_send(message.to_slice) end # Sends a message to the specified remote address. @@ -282,11 +218,7 @@ class Socket < IO # sock.send("text query", to: server) # ``` def send(message, to addr : Address) : Int32 - slice = message.to_slice - bytes_sent = LibC.sendto(fd, slice.to_unsafe.as(Void*), slice.size, 0, addr, addr.size) - raise Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1 - # to_i32 is fine because string/slice sizes are an Int32 - bytes_sent.to_i32 + system_send_to(message.to_slice, addr) end # Receives a text message from the previously bound address. @@ -302,7 +234,7 @@ class Socket < IO def receive(max_message_size = 512) : {String, Address} address = nil message = String.new(max_message_size) do |buffer| - bytes_read, sockaddr, addrlen = recvfrom(Slice.new(buffer, max_message_size)) + bytes_read, sockaddr, addrlen = system_receive(Slice.new(buffer, max_message_size)) address = Address.from(sockaddr, addrlen) {bytes_read, 0} end @@ -321,40 +253,18 @@ class Socket < IO # bytes_read, client_addr = server.receive(message) # ``` def receive(message : Bytes) : {Int32, Address} - bytes_read, sockaddr, addrlen = recvfrom(message) + bytes_read, sockaddr, addrlen = system_receive(message) {bytes_read, Address.from(sockaddr, addrlen)} end - protected def recvfrom(bytes) - sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*) - # initialize sockaddr with the initialized family of the socket - copy = sockaddr.value - copy.sa_family = family - sockaddr.value = copy - - addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage)) - - bytes_read = evented_read(bytes, "Error receiving datagram") do |slice| - LibC.recvfrom(fd, slice.to_unsafe.as(Void*), slice.size, 0, sockaddr, pointerof(addrlen)) - end - - {bytes_read, sockaddr, addrlen} - end - # Calls `shutdown(2)` with `SHUT_RD` def close_read - shutdown LibC::SHUT_RD + system_close_read end # Calls `shutdown(2)` with `SHUT_WR` def close_write - shutdown LibC::SHUT_WR - end - - private def shutdown(how) - if LibC.shutdown(fd, how) != 0 - raise Socket::Error.from_errno("shutdown #{how}") - end + system_close_write end def inspect(io : IO) : Nil @@ -388,19 +298,11 @@ class Socket < IO end def reuse_port? - getsockopt(LibC::SO_REUSEPORT, 0) do |value| - return value != 0 - end - - if Errno.value == Errno::ENOPROTOOPT - return false - else - raise Socket::Error.from_errno("getsockopt") - end + system_reuse_port? end def reuse_port=(val : Bool) - setsockopt_bool LibC::SO_REUSEPORT, val + self.system_reuse_port = val end def broadcast? @@ -420,9 +322,7 @@ class Socket < IO end def linger - v = LibC::Linger.new - ret = getsockopt LibC::SO_LINGER, v - ret.l_onoff == 0 ? nil : ret.l_linger + system_linger end # WARNING: The behavior of `SO_LINGER` is platform specific. @@ -435,17 +335,7 @@ class Socket < IO # * `0`: abort on close (socket buffer is discarded and RST sent to peer). Depends on platform and whether `shutdown()` was called first. # * `>=1`: abort after `Int` seconds on close. Linux and Cygwin may block on close. def linger=(val : Int?) - v = LibC::Linger.new - case val - when Int - v.l_onoff = 1 - v.l_linger = val - when nil - v.l_onoff = 0 - end - - setsockopt LibC::SO_LINGER, v - val + self.system_linger = val end # Returns the modified *optval*. @@ -455,18 +345,11 @@ class Socket < IO end protected def getsockopt(optname, optval, level = LibC::SOL_SOCKET) - optsize = LibC::SocklenT.new(sizeof(typeof(optval))) - ret = LibC.getsockopt(fd, level, optname, (pointerof(optval).as(Void*)), pointerof(optsize)) - yield optval if ret == 0 - ret + system_getsockopt(fd, optname, optval, level) { |value| yield value } end - # NOTE: *optval* is restricted to `Int32` until sizeof works on variables. - def setsockopt(optname, optval, level = LibC::SOL_SOCKET) - optsize = LibC::SocklenT.new(sizeof(typeof(optval))) - ret = LibC.setsockopt(fd, level, optname, (pointerof(optval).as(Void*)), optsize) - raise Socket::Error.from_errno("setsockopt") if ret == -1 - ret + protected def setsockopt(optname, optval, level = LibC::SOL_SOCKET) + system_setsockopt(fd, optname, optval, level) end private def getsockopt_bool(optname, level = LibC::SOL_SOCKET) @@ -476,38 +359,28 @@ class Socket < IO private def setsockopt_bool(optname, optval : Bool, level = LibC::SOL_SOCKET) v = optval ? 1 : 0 - ret = setsockopt optname, v, level + setsockopt optname, v, level optval end def blocking - fcntl(LibC::F_GETFL) & LibC::O_NONBLOCK == 0 + system_blocking? end def blocking=(value) - flags = fcntl(LibC::F_GETFL) - if value - flags &= ~LibC::O_NONBLOCK - else - flags |= LibC::O_NONBLOCK - end - fcntl(LibC::F_SETFL, flags) + self.system_blocking = value end def close_on_exec? - flags = fcntl(LibC::F_GETFD) - (flags & LibC::FD_CLOEXEC) == LibC::FD_CLOEXEC + system_close_on_exec? end def close_on_exec=(arg : Bool) - fcntl(LibC::F_SETFD, arg ? LibC::FD_CLOEXEC : 0) - arg + system_close_on_exec = arg end def self.fcntl(fd, cmd, arg = 0) - r = LibC.fcntl fd, cmd, arg - raise Socket::Error.from_errno("fcntl() failed") if r == -1 - r + Crystal::System::Socket.fcntl(fd, cmd, arg) end def fcntl(cmd, arg = 0) @@ -525,19 +398,7 @@ class Socket < IO end def tty? - LibC.isatty(fd) == 1 - end - - private def unbuffered_read(slice : Bytes) - evented_read(slice, "Error reading socket") do - LibC.recv(fd, slice, slice.size, 0).to_i32 - end - end - - private def unbuffered_write(slice : Bytes) - evented_write(slice, "Error writing to socket") do |slice| - LibC.send(fd, slice, slice.size, 0) - end + system_tty? end private def unbuffered_rewind @@ -547,27 +408,9 @@ class Socket < IO private def unbuffered_close return if @closed - # Perform libevent cleanup before LibC.close. - # Using a file descriptor after it has been closed is never defined and can - # always lead to undefined results. This is not specific to libevent. @closed = true - evented_close - - # Clear the @volatile_fd before actually closing it in order to - # reduce the chance of reading an outdated fd value - _fd = @volatile_fd.swap(-1) - - err = nil - if LibC.close(_fd) != 0 - case Errno.value - when Errno::EINTR, Errno::EINPROGRESS - # ignore - else - err = Socket::Error.from_errno("Error closing socket") - end - end - raise err if err + system_close end private def unbuffered_flush diff --git a/src/socket/tcp_server.cr b/src/socket/tcp_server.cr index bfecdd146b5a..56f895c12cda 100644 --- a/src/socket/tcp_server.cr +++ b/src/socket/tcp_server.cr @@ -36,7 +36,7 @@ class TCPServer < TCPSocket self.reuse_address = true self.reuse_port = true if reuse_port - if errno = bind(addrinfo, "#{host}:#{port}") { |errno| errno } + if errno = system_bind(addrinfo, "#{host}:#{port}") { |errno| errno } close next errno end @@ -104,7 +104,7 @@ class TCPServer < TCPSocket # end # ``` def accept? : TCPSocket? - if client_fd = accept_impl + if client_fd = system_accept sock = TCPSocket.new(fd: client_fd, family: family, type: type, protocol: protocol) sock.sync = sync? sock diff --git a/src/socket/udp_socket.cr b/src/socket/udp_socket.cr index 5f25c6b1ce30..d667e9196a22 100644 --- a/src/socket/udp_socket.cr +++ b/src/socket/udp_socket.cr @@ -68,7 +68,7 @@ class UDPSocket < IPSocket def receive(max_message_size = 512) : {String, IPAddress} address = nil message = String.new(max_message_size) do |buffer| - bytes_read, sockaddr, addrlen = recvfrom(Slice.new(buffer, max_message_size)) + bytes_read, sockaddr, addrlen = system_receive(Slice.new(buffer, max_message_size)) address = IPAddress.from(sockaddr, addrlen) {bytes_read, 0} end @@ -87,7 +87,7 @@ class UDPSocket < IPSocket # bytes_read, client_addr = server.receive(message) # ``` def receive(message : Bytes) : {Int32, IPAddress} - bytes_read, sockaddr, addrlen = recvfrom(message) + bytes_read, sockaddr, addrlen = system_receive(message) {bytes_read, IPAddress.from(sockaddr, addrlen)} end diff --git a/src/socket/unix_server.cr b/src/socket/unix_server.cr index 35fd53646ec0..cc79ef35d20d 100644 --- a/src/socket/unix_server.cr +++ b/src/socket/unix_server.cr @@ -34,7 +34,7 @@ class UNIXServer < UNIXSocket def initialize(@path : String, type : Type = Type::STREAM, backlog : Int = 128) super(Family::UNIX, type) - bind(UNIXAddress.new(path), path) do |error| + system_bind(UNIXAddress.new(path), path) do |error| close(delete: false) raise error end @@ -68,7 +68,7 @@ class UNIXServer < UNIXSocket # Returns the client socket or `nil` if the server is closed after invoking # this method. def accept? : UNIXSocket? - if client_fd = accept_impl + if client_fd = system_accept sock = UNIXSocket.new(fd: client_fd, type: type, path: @path) sock.sync = sync? sock From 6cdaf80f36fcc5b45fcc03dea1b3362e139dd34f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Mon, 17 May 2021 15:26:42 +0200 Subject: [PATCH 2/3] Extract system-specifics from IPSocket --- src/crystal/system/socket.cr | 6 ++++++ src/crystal/system/unix/socket.cr | 24 ++++++++++++++++++++++++ src/socket/ip_socket.cr | 20 ++------------------ 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/crystal/system/socket.cr b/src/crystal/system/socket.cr index fe9cf321633c..c41bae1716e1 100644 --- a/src/crystal/system/socket.cr +++ b/src/crystal/system/socket.cr @@ -54,6 +54,12 @@ module Crystal::System::Socket # private def unbuffered_write(slice : Bytes) # private def system_close + + # IPSocket: + + # private def system_local_address + + # private def system_remote_address end {% if flag?(:unix) %} diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index 2982c857747c..d4982d3b918f 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -238,4 +238,28 @@ module Crystal::System::Socket end end end + + private def system_local_address + sockaddr6 = uninitialized LibC::SockaddrIn6 + sockaddr = pointerof(sockaddr6).as(LibC::Sockaddr*) + addrlen = sizeof(LibC::SockaddrIn6).to_u32! + + if LibC.getsockname(fd, sockaddr, pointerof(addrlen)) != 0 + raise ::Socket::Error.from_errno("getsockname") + end + + ::Socket::IPAddress.from(sockaddr, addrlen) + end + + private def system_remote_address + sockaddr6 = uninitialized LibC::SockaddrIn6 + sockaddr = pointerof(sockaddr6).as(LibC::Sockaddr*) + addrlen = sizeof(LibC::SockaddrIn6).to_u32! + + if LibC.getpeername(fd, sockaddr, pointerof(addrlen)) != 0 + raise ::Socket::Error.from_errno("getpeername") + end + + ::Socket::IPAddress.from(sockaddr, addrlen) + end end diff --git a/src/socket/ip_socket.cr b/src/socket/ip_socket.cr index ea59b9fcc6c3..bb9c11a45ef7 100644 --- a/src/socket/ip_socket.cr +++ b/src/socket/ip_socket.cr @@ -1,27 +1,11 @@ class IPSocket < Socket # Returns the `IPAddress` for the local end of the IP socket. def local_address - sockaddr6 = uninitialized LibC::SockaddrIn6 - sockaddr = pointerof(sockaddr6).as(LibC::Sockaddr*) - addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrIn6)) - - if LibC.getsockname(fd, sockaddr, pointerof(addrlen)) != 0 - raise Socket::Error.from_errno("getsockname") - end - - IPAddress.from(sockaddr, addrlen) + system_local_address end # Returns the `IPAddress` for the remote end of the IP socket. def remote_address - sockaddr6 = uninitialized LibC::SockaddrIn6 - sockaddr = pointerof(sockaddr6).as(LibC::Sockaddr*) - addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrIn6)) - - if LibC.getpeername(fd, sockaddr, pointerof(addrlen)) != 0 - raise Socket::Error.from_errno("getpeername") - end - - IPAddress.from(sockaddr, addrlen) + system_remote_address end end From 299e45331599bf16d23b8a7018cc4b0c2097fbb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Mon, 17 May 2021 15:28:30 +0200 Subject: [PATCH 3/3] Extract system-specifics from TCPSocket --- src/crystal/system/socket.cr | 14 +++++++ src/crystal/system/unix/socket.cr | 69 +++++++++++++++++++++++++++++++ src/socket/tcp_socket.cr | 62 ++++++++++----------------- 3 files changed, 105 insertions(+), 40 deletions(-) diff --git a/src/crystal/system/socket.cr b/src/crystal/system/socket.cr index c41bae1716e1..9f1fc7114592 100644 --- a/src/crystal/system/socket.cr +++ b/src/crystal/system/socket.cr @@ -60,6 +60,20 @@ module Crystal::System::Socket # private def system_local_address # private def system_remote_address + + # TCPSocket: + + # private def system_tcp_keepalive_idle + + # private def system_tcp_keepalive_idle=(val : Int) + + # private def system_tcp_keepalive_interval + + # private def system_tcp_keepalive_interval=(val : Int) + + # private def system_tcp_keepalive_count + + # private def system_tcp_keepalive_count=(val : Int) end {% if flag?(:unix) %} diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index d4982d3b918f..6007b3b524da 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -262,4 +262,73 @@ module Crystal::System::Socket ::Socket::IPAddress.from(sockaddr, addrlen) end + + {% if flag?(:openbsd) %} + private def system_tcp_keepalive_idle + raise NotImplementedError.new("system_tcp_keepalive_idle") + end + + private def system_tcp_keepalive_idle=(val : Int) + raise NotImplementedError.new("system_tcp_keepalive_idle=") + end + + private def system_tcp_keepalive_interval + raise NotImplementedError.new("system_tcp_keepalive_interval") + end + + private def system_tcp_keepalive_interval=(val : Int) + raise NotImplementedError.new("system_tcp_keepalive_interval=") + end + + private def system_tcp_keepalive_count + raise NotImplementedError.new("system_tcp_keepalive_count") + end + + private def system_tcp_keepalive_count=(val : Int) + raise NotImplementedError.new("system_tcp_keepalive_count=") + end + {% else %} + private def system_tcp_keepalive_idle + optname = {% if flag?(:darwin) %} + LibC::TCP_KEEPALIVE + {% elsif flag?(:netbsd) %} + LibC::SO_KEEPALIVE + {% else %} + LibC::TCP_KEEPIDLE + {% end %} + getsockopt optname, 0, level: ::Socket::Protocol::TCP + end + + private def system_tcp_keepalive_idle=(val : Int) + optname = {% if flag?(:darwin) %} + LibC::TCP_KEEPALIVE + {% elsif flag?(:netbsd) %} + LibC::SO_KEEPALIVE + {% else %} + LibC::TCP_KEEPIDLE + {% end %} + setsockopt optname, val, level: ::Socket::Protocol::TCP + val + end + + # The amount of time in seconds between keepalive probes. + private def system_tcp_keepalive_interval + getsockopt LibC::TCP_KEEPINTVL, 0, level: ::Socket::Protocol::TCP + end + + private def system_tcp_keepalive_interval=(val : Int) + setsockopt LibC::TCP_KEEPINTVL, val, level: ::Socket::Protocol::TCP + val + end + + # The number of probes sent, without response before dropping the connection. + private def system_tcp_keepalive_count + getsockopt LibC::TCP_KEEPCNT, 0, level: ::Socket::Protocol::TCP + end + + private def system_tcp_keepalive_count=(val : Int) + setsockopt LibC::TCP_KEEPCNT, val, level: ::Socket::Protocol::TCP + val + end + {% end %} end diff --git a/src/socket/tcp_socket.cr b/src/socket/tcp_socket.cr index cbf105c80b7b..b57a3aabf1e5 100644 --- a/src/socket/tcp_socket.cr +++ b/src/socket/tcp_socket.cr @@ -70,49 +70,31 @@ class TCPSocket < IPSocket setsockopt_bool LibC::TCP_NODELAY, val, level: Protocol::TCP end - {% unless flag?(:openbsd) %} - # The amount of time in seconds the connection must be idle before sending keepalive probes. - def tcp_keepalive_idle - optname = {% if flag?(:darwin) %} - LibC::TCP_KEEPALIVE - {% elsif flag?(:netbsd) %} - LibC::SO_KEEPALIVE - {% else %} - LibC::TCP_KEEPIDLE - {% end %} - getsockopt optname, 0, level: Protocol::TCP - end + # The amount of time in seconds the connection must be idle before sending keepalive probes. + def tcp_keepalive_idle + system_tcp_keepalive_idle + end - def tcp_keepalive_idle=(val : Int) - optname = {% if flag?(:darwin) %} - LibC::TCP_KEEPALIVE - {% elsif flag?(:netbsd) %} - LibC::SO_KEEPALIVE - {% else %} - LibC::TCP_KEEPIDLE - {% end %} - setsockopt optname, val, level: Protocol::TCP - val - end + def tcp_keepalive_idle=(val : Int) + self.system_tcp_keepalive_idle = val + end - # The amount of time in seconds between keepalive probes. - def tcp_keepalive_interval - getsockopt LibC::TCP_KEEPINTVL, 0, level: Protocol::TCP - end + # The amount of time in seconds between keepalive probes. + def tcp_keepalive_interval + system_tcp_keepalive_interval + end - def tcp_keepalive_interval=(val : Int) - setsockopt LibC::TCP_KEEPINTVL, val, level: Protocol::TCP - val - end + def tcp_keepalive_interval=(val : Int) + self.system_tcp_keepalive_interval = val + val + end - # The number of probes sent, without response before dropping the connection. - def tcp_keepalive_count - getsockopt LibC::TCP_KEEPCNT, 0, level: Protocol::TCP - end + # The number of probes sent, without response before dropping the connection. + def tcp_keepalive_count + system_tcp_keepalive_count + end - def tcp_keepalive_count=(val : Int) - setsockopt LibC::TCP_KEEPCNT, val, level: Protocol::TCP - val - end - {% end %} + def tcp_keepalive_count=(val : Int) + self.system_tcp_keepalive_count = val + end end