Skip to content

Commit

Permalink
Support asynchronous IO.pipe on Windows (#13362)
Browse files Browse the repository at this point in the history
  • Loading branch information
HertzDevil authored Apr 28, 2023
1 parent 8bc95c1 commit dfa8fe4
Show file tree
Hide file tree
Showing 18 changed files with 241 additions and 125 deletions.
2 changes: 1 addition & 1 deletion spec/compiler/crystal/tools/doc/project_info_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe Crystal::Doc::ProjectInfo do
File.write("shard.yml", "name: foo\nversion: 1.0")
end

pending_win32 "git missing" do
it "git missing" do
Crystal::Git.executable = "git-missing-executable"

assert_with_defaults(ProjectInfo.new(nil, nil), ProjectInfo.new("foo", "1.0", refname: nil))
Expand Down
2 changes: 1 addition & 1 deletion spec/std/http/client/client_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ module HTTP
end
end

pending_win32 "will retry a broken socket" do
it "will retry a broken socket" do
server = HTTP::Server.new do |context|
context.response.output.print "foo"
context.response.output.close
Expand Down
38 changes: 18 additions & 20 deletions spec/std/http/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,27 @@ end
def run_handler(handler, &)
done = Channel(Exception?).new

begin
IO::Stapled.pipe do |server_io, client_io|
processor = HTTP::Server::RequestProcessor.new(handler)
f = spawn do
processor.process(server_io, server_io)
rescue exc
done.send exc
else
done.send nil
end
IO::Stapled.pipe do |server_io, client_io|
processor = HTTP::Server::RequestProcessor.new(handler)
f = spawn do
processor.process(server_io, server_io)
rescue exc
done.send exc
else
done.send nil
end

client = HTTP::Client.new(client_io)
client = HTTP::Client.new(client_io)

begin
wait_until_blocked f
begin
wait_until_blocked f

yield client
ensure
processor.close
server_io.close
if exc = done.receive
raise exc
end
yield client
ensure
processor.close
server_io.close
if exc = done.receive
raise exc
end
end
end
Expand Down
8 changes: 4 additions & 4 deletions spec/std/io/io_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ end

describe IO do
describe "partial read" do
pending_win32 "doesn't block on first read. blocks on 2nd read" do
it "doesn't block on first read. blocks on 2nd read" do
IO.pipe do |read, write|
write.puts "hello"
slice = Bytes.new 1024
Expand Down Expand Up @@ -920,8 +920,8 @@ describe IO do
end
{% end %}

pending_win32 describe: "#close" do
it "aborts 'read' in a different thread" do
describe "#close" do
it "aborts 'read' in a different fiber" do
ch = Channel(SpecChannelStatus).new(1)

IO.pipe do |read, write|
Expand All @@ -942,7 +942,7 @@ describe IO do
end
end

it "aborts 'write' in a different thread" do
it "aborts 'write' in a different fiber" do
ch = Channel(SpecChannelStatus).new(1)

IO.pipe do |read, write|
Expand Down
2 changes: 1 addition & 1 deletion spec/std/log/io_backend_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private def io_logger(*, stdout : IO, config = nil, source : String = "")
end

describe Log::IOBackend do
pending_win32 "creates with defaults" do
it "creates with defaults" do
backend = Log::IOBackend.new
backend.io.should eq(STDOUT)
backend.formatter.should eq(Log::ShortFormat)
Expand Down
2 changes: 1 addition & 1 deletion spec/std/oauth2/client_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe OAuth2::Client do
end
end

pending_win32 describe: "get_access_token_using_*" do
describe "get_access_token_using_*" do
describe "using HTTP Basic authentication to pass credentials" do
it "#get_access_token_using_authorization_code" do
handler = HTTP::Handler::HandlerProc.new do |context|
Expand Down
8 changes: 5 additions & 3 deletions spec/std/process_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ describe Process do
value.should eq("hello#{newline}")
end

pending_win32 "sends input in IO" do
it "sends input in IO" do
value = Process.run(*stdin_to_stdout_command, input: IO::Memory.new("hello")) do |proc|
proc.input?.should be_nil
proc.output.gets_to_end
end
value.should eq("hello")
value.chomp.should eq("hello")
end

it "sends output to IO" do
Expand Down Expand Up @@ -305,6 +305,8 @@ describe Process do
{% end %}
end

# TODO: this spec gives "WaitForSingleObject: The handle is invalid."
# is this because standard streams on windows aren't async?
pending_win32 "can link processes together" do
buffer = IO::Memory.new
Process.run(*stdin_to_stdout_command) do |cat|
Expand All @@ -313,7 +315,7 @@ describe Process do
cat.close
end
end
buffer.to_s.lines.size.should eq(1000)
buffer.to_s.chomp.lines.size.should eq(1000)
end
end

Expand Down
4 changes: 4 additions & 0 deletions src/crystal/system/unix/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ module Crystal::System::FileDescriptor
fcntl(LibC::F_SETFL, new_flags) unless new_flags == current_flags
end

private def system_blocking_init(value)
self.system_blocking = false unless value
end

private def system_close_on_exec?
flags = fcntl(LibC::F_GETFD)
flags.bits_set? LibC::FD_CLOEXEC
Expand Down
3 changes: 3 additions & 0 deletions src/crystal/system/wasi/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ module Crystal::System::FileDescriptor
raise NotImplementedError.new "Crystal::System::FileDescriptor.pipe"
end

private def system_blocking_init(value)
end

private def system_reopen(other : IO::FileDescriptor)
raise NotImplementedError.new "Crystal::System::FileDescriptor#system_reopen"
end
Expand Down
85 changes: 62 additions & 23 deletions src/crystal/system/win32/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,50 @@ require "c/io"
require "c/consoleapi"
require "c/consoleapi2"
require "c/winnls"
require "io/overlapped"

module Crystal::System::FileDescriptor
include IO::Overlapped

@volatile_fd : Atomic(LibC::Int)
@system_blocking = true

private def unbuffered_read(slice : Bytes)
bytes_read = LibC._read(fd, slice, slice.size)
if bytes_read == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for reading"
else
raise IO::Error.from_errno("Error reading file")
if system_blocking?
bytes_read = LibC._read(fd, slice, slice.size)
if bytes_read == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for reading"
else
raise IO::Error.from_errno("Error reading file")
end
end
bytes_read
else
handle = windows_handle
overlapped_operation(handle, "ReadFile", read_timeout) do |overlapped|
ret = LibC.ReadFile(handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end
end
bytes_read
end

private def unbuffered_write(slice : Bytes)
until slice.empty?
bytes_written = LibC._write(fd, slice, slice.size)
if bytes_written == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
else
raise IO::Error.from_errno("Error writing file")
if system_blocking?
bytes_written = LibC._write(fd, slice, slice.size)
if bytes_written == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
else
raise IO::Error.from_errno("Error writing file")
end
end
else
handle = windows_handle
bytes_written = overlapped_operation(handle, "WriteFile", write_timeout, writing: true) do |overlapped|
ret = LibC.WriteFile(handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end
end

Expand All @@ -34,11 +54,17 @@ module Crystal::System::FileDescriptor
end

private def system_blocking?
true
@system_blocking
end

private def system_blocking=(blocking)
raise NotImplementedError.new("Crystal::System::FileDescriptor#system_blocking=") unless blocking
unless blocking == @system_blocking
raise IO::Error.new("Cannot reconfigure `IO::FileDescriptor#blocking` after creation")
end
end

private def system_blocking_init(value)
@system_blocking = value
end

private def system_close_on_exec?
Expand Down Expand Up @@ -124,11 +150,12 @@ module Crystal::System::FileDescriptor
end

private def system_close
LibC.CancelIoEx(windows_handle, nil) unless system_blocking?

file_descriptor_close
end

def file_descriptor_close
err = nil
if LibC._close(fd) != 0
case Errno.value
when Errno::EINTR
Expand All @@ -139,14 +166,26 @@ module Crystal::System::FileDescriptor
end
end

def self.pipe(read_blocking, write_blocking)
pipe_fds = uninitialized StaticArray(LibC::Int, 2)
if LibC._pipe(pipe_fds, 8192, LibC::O_BINARY | LibC::O_NOINHERIT) != 0
raise IO::Error.from_errno("Could not create pipe")
end
private PIPE_BUFFER_SIZE = 8192

r = IO::FileDescriptor.new(pipe_fds[0], read_blocking)
w = IO::FileDescriptor.new(pipe_fds[1], write_blocking)
def self.pipe(read_blocking, write_blocking)
pipe_name = ::Path.windows(::File.tempname("crystal", nil, dir: %q(\\.\pipe))).normalize.to_s
pipe_mode = 0 # PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT

w_pipe_flags = LibC::PIPE_ACCESS_OUTBOUND | LibC::FILE_FLAG_FIRST_PIPE_INSTANCE
w_pipe_flags |= LibC::FILE_FLAG_OVERLAPPED unless write_blocking
w_pipe = LibC.CreateNamedPipeA(pipe_name, w_pipe_flags, pipe_mode, 1, PIPE_BUFFER_SIZE, PIPE_BUFFER_SIZE, 0, nil)
raise IO::Error.from_winerror("CreateNamedPipeA") if w_pipe == LibC::INVALID_HANDLE_VALUE
Crystal::Scheduler.event_loop.create_completion_port(w_pipe) unless write_blocking

r_pipe_flags = LibC::FILE_FLAG_NO_BUFFERING
r_pipe_flags |= LibC::FILE_FLAG_OVERLAPPED unless read_blocking
r_pipe = LibC.CreateFileW(System.to_wstr(pipe_name), LibC::GENERIC_READ | LibC::FILE_WRITE_ATTRIBUTES, 0, nil, LibC::OPEN_EXISTING, r_pipe_flags, nil)
raise IO::Error.from_winerror("CreateFileW") if r_pipe == LibC::INVALID_HANDLE_VALUE
Crystal::Scheduler.event_loop.create_completion_port(r_pipe) unless read_blocking

r = IO::FileDescriptor.new(LibC._open_osfhandle(r_pipe, 0), read_blocking)
w = IO::FileDescriptor.new(LibC._open_osfhandle(w_pipe, 0), write_blocking)
w.sync = true

{r, w}
Expand Down
53 changes: 52 additions & 1 deletion src/crystal/system/win32/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,26 @@ module Crystal::System::Socket
end
end

private def overlapped_connect(socket, method, &)
OverlappedOperation.run(socket) do |operation|
yield operation.start

schedule_overlapped(read_timeout || 1.seconds)

operation.wsa_result(socket) do |error|
case error
when .wsa_io_incomplete?, .wsaeconnrefused?
return ::Socket::ConnectError.from_os_error(method, error)
when .error_operation_aborted?
# FIXME: Not sure why this is necessary
return ::Socket::ConnectError.from_os_error(method, error)
end
end

nil
end
end

private def system_connect_connectionless(addr, timeout, &)
ret = LibC.connect(fd, addr, addr.size)
if ret == LibC::SOCKET_ERROR
Expand Down Expand Up @@ -206,6 +226,25 @@ module Crystal::System::Socket
true
end

private def overlapped_accept(socket, method, &)
OverlappedOperation.run(socket) do |operation|
yield operation.start

unless schedule_overlapped(read_timeout)
raise IO::TimeoutError.new("accept timed out")
end

operation.wsa_result(socket) do |error|
case error
when .wsa_io_incomplete?, .wsaenotsock?
return false
end
end

true
end
end

private def wsa_buffer(bytes)
wsabuf = LibC::WSABUF.new
wsabuf.len = bytes.size
Expand Down Expand Up @@ -402,7 +441,7 @@ module Crystal::System::Socket
private def unbuffered_read(slice : Bytes)
wsabuf = wsa_buffer(slice)

bytes_read = overlapped_operation(fd, "WSARecv", read_timeout, connreset_is_error: false) do |overlapped|
bytes_read = overlapped_read(fd, "WSARecv", connreset_is_error: false) do |overlapped|
flags = 0_u32
LibC.WSARecv(fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil)
end
Expand All @@ -419,6 +458,18 @@ module Crystal::System::Socket
bytes.to_i32
end

private def overlapped_write(socket, method, &)
wsa_overlapped_operation(socket, method, write_timeout) do |operation|
yield operation
end
end

private def overlapped_read(socket, method, *, connreset_is_error = true, &)
wsa_overlapped_operation(socket, method, read_timeout, connreset_is_error) do |operation|
yield operation
end
end

def system_close
handle = @volatile_fd.swap(LibC::INVALID_SOCKET)

Expand Down
4 changes: 1 addition & 3 deletions src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ class IO::FileDescriptor < IO
end
end

unless blocking || {{ flag?(:win32) || flag?(:wasi) }}
self.blocking = false
end
system_blocking_init(blocking)
end

# :nodoc:
Expand Down
Loading

0 comments on commit dfa8fe4

Please sign in to comment.