Skip to content

Commit

Permalink
Emulate non-blocking STDIN console on Windows (#14947)
Browse files Browse the repository at this point in the history
  • Loading branch information
HertzDevil committed Sep 5, 2024
1 parent 1055af2 commit 95af602
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 11 deletions.
9 changes: 9 additions & 0 deletions src/crystal/system/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ module Crystal::System::FileDescriptor
# Also used in `IO::FileDescriptor#finalize`.
# def file_descriptor_close

# Returns `true` or `false` if this file descriptor pretends to block or not
# to block the caller thread regardless of the underlying internal file
# descriptor's implementation. Returns `nil` if nothing needs to be done, i.e.
# `#blocking` is identical to `#system_blocking?`.
#
# Currently used by console STDIN on Windows.
private def emulated_blocking? : Bool?
end

private def system_read(slice : Bytes) : Int32
event_loop.read(self, slice)
end
Expand Down
64 changes: 63 additions & 1 deletion src/crystal/system/win32/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "c/consoleapi"
require "c/consoleapi2"
require "c/winnls"
require "crystal/system/win32/iocp"
require "crystal/system/thread"

module Crystal::System::FileDescriptor
# Platform-specific type to represent a file descriptor handle to the operating
Expand Down Expand Up @@ -76,13 +77,24 @@ module Crystal::System::FileDescriptor
bytes_written
end

def emulated_blocking? : Bool?
# reading from STDIN is done via a separate thread (see
# `ConsoleUtils.read_console` below)
handle = windows_handle
if LibC.GetConsoleMode(handle, out _) != 0
if handle == LibC.GetStdHandle(LibC::STD_INPUT_HANDLE)
return false
end
end
end

# :nodoc:
def system_blocking?
@system_blocking
end

private def system_blocking=(blocking)
unless blocking == @system_blocking
unless blocking == self.blocking
raise IO::Error.new("Cannot reconfigure `IO::FileDescriptor#blocking` after creation")
end
end
Expand Down Expand Up @@ -339,7 +351,11 @@ module Crystal::System::FileDescriptor
end
end

# `blocking` must be set to `true` because the underlying handles never
# support overlapped I/O; instead, `#emulated_blocking?` should return
# `false` for `STDIN` as it uses a separate thread
io = IO::FileDescriptor.new(handle.address, blocking: true)

# Set sync or flush_on_newline as described in STDOUT and STDERR docs.
# See https://crystal-lang.org/api/toplevel.html#STDERR
if console_handle
Expand Down Expand Up @@ -465,11 +481,57 @@ private module ConsoleUtils
end

private def self.read_console(handle : LibC::HANDLE, slice : Slice(UInt16)) : Int32
@@mtx.synchronize do
@@read_requests << ReadRequest.new(
handle: handle,
slice: slice,
iocp: Crystal::EventLoop.current.iocp,
completion_key: Crystal::IOCP::CompletionKey.new(:stdin_read, ::Fiber.current),
)
@@read_cv.signal
end

::Fiber.suspend

@@mtx.synchronize do
@@bytes_read.shift
end
end

private def self.read_console_blocking(handle : LibC::HANDLE, slice : Slice(UInt16)) : Int32
if 0 == LibC.ReadConsoleW(handle, slice, slice.size, out units_read, nil)
raise IO::Error.from_winerror("ReadConsoleW")
end
units_read.to_i32
end

record ReadRequest, handle : LibC::HANDLE, slice : Slice(UInt16), iocp : LibC::HANDLE, completion_key : Crystal::IOCP::CompletionKey

@@read_cv = ::Thread::ConditionVariable.new
@@read_requests = Deque(ReadRequest).new
@@bytes_read = Deque(Int32).new
@@mtx = ::Thread::Mutex.new
@@reader_thread = ::Thread.new { reader_loop }

private def self.reader_loop
while true
request = @@mtx.synchronize do
loop do
if entry = @@read_requests.shift?
break entry
end
@@read_cv.wait(@@mtx)
end
end

bytes = read_console_blocking(request.handle, request.slice)

@@mtx.synchronize do
@@bytes_read << bytes
LibC.PostQueuedCompletionStatus(request.iocp, LibC::JOB_OBJECT_MSG_EXIT_PROCESS, request.completion_key.object_id, nil)
end
end
end
end

# Enable UTF-8 console I/O for the duration of program execution
Expand Down
35 changes: 26 additions & 9 deletions src/crystal/system/win32/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ require "crystal/system/thread_linked_list"
module Crystal::IOCP
# :nodoc:
class CompletionKey
enum Tag
ProcessRun
StdinRead
end

property fiber : Fiber?
getter tag : Tag

def initialize(@tag : Tag, @fiber : Fiber? = nil)
end
end

def self.wait_queued_completions(timeout, alertable = false, &)
Expand Down Expand Up @@ -39,20 +48,19 @@ module Crystal::IOCP
# at the moment only `::Process#wait` uses a non-nil completion key; all
# I/O operations, including socket ones, do not set this field
case completion_key = Pointer(Void).new(entry.lpCompletionKey).as(CompletionKey?)
when Nil
in Nil
operation = OverlappedOperation.unbox(entry.lpOverlapped)
operation.schedule { |fiber| yield fiber }
else
case entry.dwNumberOfBytesTransferred
when LibC::JOB_OBJECT_MSG_EXIT_PROCESS, LibC::JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS
in CompletionKey
if completion_key_valid?(completion_key, entry.dwNumberOfBytesTransferred)
# if `Process` exits before a call to `#wait`, this fiber will be
# reset already
if fiber = completion_key.fiber
# this ensures the `::Process` doesn't keep an indirect reference to
# `::Thread.current`, as that leads to a finalization cycle
# this ensures existing references to `completion_key` do not keep
# an indirect reference to `::Thread.current`, as that leads to a
# finalization cycle
completion_key.fiber = nil

yield fiber
else
# the `Process` exits before a call to `#wait`; do nothing
end
end
end
Expand All @@ -61,6 +69,15 @@ module Crystal::IOCP
false
end

private def self.completion_key_valid?(completion_key, number_of_bytes_transferred)
case completion_key.tag
in .process_run?
number_of_bytes_transferred.in?(LibC::JOB_OBJECT_MSG_EXIT_PROCESS, LibC::JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS)
in .stdin_read?
true
end
end

class OverlappedOperation
enum State
STARTED
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/win32/process.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct Crystal::System::Process
@thread_id : LibC::DWORD
@process_handle : LibC::HANDLE
@job_object : LibC::HANDLE
@completion_key = IOCP::CompletionKey.new
@completion_key = IOCP::CompletionKey.new(:process_run)

@@interrupt_handler : Proc(::Process::ExitReason, Nil)?
@@interrupt_count = Crystal::AtomicSemaphore.new
Expand Down
8 changes: 8 additions & 0 deletions src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,15 @@ class IO::FileDescriptor < IO
Crystal::System::FileDescriptor.from_stdio(fd)
end

# Returns whether I/O operations on this file descriptor block the current
# thread. If false, operations might opt to suspend the current fiber instead.
#
# This might be different from the internal file descriptor. For example, when
# `STDIN` is a terminal on Windows, this returns `false` since the underlying
# blocking reads are done on a completely separate thread.
def blocking
emulated = emulated_blocking?
return emulated unless emulated.nil?
system_blocking?
end

Expand Down
8 changes: 8 additions & 0 deletions src/lib_c/x86_64-windows-msvc/c/ioapiset.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ lib LibC
dwMilliseconds : DWORD,
fAlertable : BOOL
) : BOOL

fun PostQueuedCompletionStatus(
completionPort : HANDLE,
dwNumberOfBytesTransferred : DWORD,
dwCompletionKey : ULONG_PTR,
lpOverlapped : OVERLAPPED*
) : BOOL

fun CancelIoEx(
hFile : HANDLE,
lpOverlapped : OVERLAPPED*
Expand Down

0 comments on commit 95af602

Please sign in to comment.