Skip to content

Commit

Permalink
named-pipes: fix receiving IOCP events after deregister
Browse files Browse the repository at this point in the history
Backport of the following commits
 * 04c3bb6
 * dc3b3f6
 * 6fd5dd0
  • Loading branch information
carllerche authored and Thomasdezeeuw committed Mar 1, 2024
1 parent c710a30 commit 90d4fe0
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 12 deletions.
8 changes: 8 additions & 0 deletions src/sys/windows/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ impl Event {
pub(super) fn to_completion_status(&self) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, std::ptr::null_mut())
}

#[cfg(feature = "os-ext")]
pub(super) fn to_completion_status_with_overlapped(
&self,
overlapped: *mut super::Overlapped,
) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, overlapped)
}
}

pub(crate) const READABLE_FLAGS: u32 = afd::POLL_RECEIVE
Expand Down
83 changes: 71 additions & 12 deletions src/sys/windows/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ struct Inner {
connect: Overlapped,
read: Overlapped,
write: Overlapped,
event: Overlapped,
// END NOTE.
handle: Handle,
connecting: AtomicBool,
Expand All @@ -110,10 +111,16 @@ impl Inner {

/// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
// `read` is after `connect: Overlapped` and `read: Overlapped`.
// `write` is after `connect: Overlapped` and `read: Overlapped`.
(ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
}

/// Same as [`ptr_from_conn_overlapped`] but for `Inner.event`.
unsafe fn ptr_from_event_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
// `event` is after `connect: Overlapped`, `read: Overlapped`, and `write: Overlapped`.
(ptr as *mut Overlapped).wrapping_sub(3) as *const Inner
}

/// Issue a connection request with the specified overlapped operation.
///
/// This function will issue a request to connect a client to this server,
Expand Down Expand Up @@ -478,6 +485,7 @@ impl FromRawHandle for NamedPipe {
connecting: AtomicBool::new(false),
read: Overlapped::new(read_done),
write: Overlapped::new(write_done),
event: Overlapped::new(event_done),
io: Mutex::new(Io {
cp: None,
token: None,
Expand Down Expand Up @@ -724,7 +732,7 @@ impl Inner {
// out the error.
Err(e) => {
io.read = State::Err(e);
io.notify_readable(events);
io.notify_readable(me, events);
true
}
}
Expand Down Expand Up @@ -787,7 +795,7 @@ impl Inner {
Ok(None) => (),
Err(e) => {
io.write = State::Err(e);
io.notify_writable(events);
io.notify_writable(me, events);
}
}
}
Expand All @@ -797,7 +805,7 @@ impl Inner {
#[allow(clippy::needless_option_as_deref)]
if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
if let State::None = io.write {
io.notify_writable(events);
io.notify_writable(me, events);
}
}
}
Expand Down Expand Up @@ -877,7 +885,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
}

// Flag our readiness that we've got data.
io.notify_readable(events);
io.notify_readable(&me, events);
}

fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
Expand All @@ -895,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// `Ok` here means, that the operation was completed immediately
// `bytes_transferred` is already reported to a client
State::Ok(..) => {
io.notify_writable(events);
io.notify_writable(&me, events);
return;
}
State::Pending(buf, pos) => (buf, pos),
Expand All @@ -909,20 +917,46 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
let new_pos = pos + (status.bytes_transferred() as usize);
if new_pos == buf.len() {
me.put_buffer(buf);
io.notify_writable(events);
io.notify_writable(&me, events);
} else {
Inner::schedule_write(&me, buf, new_pos, &mut io, events);
}
}
Err(e) => {
debug_assert_eq!(status.bytes_transferred(), 0);
io.write = State::Err(e);
io.notify_writable(events);
io.notify_writable(&me, events);
}
}
}
}

fn event_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
let status = CompletionStatus::from_entry(status);

// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_write` above.
let me = unsafe { Arc::from_raw(Inner::ptr_from_event_overlapped(status.overlapped())) };

let io = me.io.lock().unwrap();

// Make sure the I/O handle is still registered with the selector
if io.token.is_some() {
// This method is also called during `Selector::drop` to perform
// cleanup. In this case, `events` is `None` and we don't need to track
// the event.
if let Some(events) = events {
let mut ev = Event::from_completion_status(&status);
// Reverse the `.data` alteration done in `schedule_event`. This
// alteration was done so the selector recognized the event as one from
// a named pipe.
ev.data >>= 1;
events.push(ev);
}
}
}

impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
Expand All @@ -938,28 +972,53 @@ impl Io {
}
}

fn notify_readable(&self, events: Option<&mut Vec<Event>>) {
fn notify_readable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
if let Some(token) = self.token {
let mut ev = Event::new(token);
ev.set_readable();

if let Some(events) = events {
events.push(ev);
} else {
let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
self.schedule_event(me, ev);
}
}
}

fn notify_writable(&self, events: Option<&mut Vec<Event>>) {
fn notify_writable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
if let Some(token) = self.token {
let mut ev = Event::new(token);
ev.set_writable();

if let Some(events) = events {
events.push(ev);
} else {
let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
self.schedule_event(me, ev);
}
}
}

fn schedule_event(&self, me: &Arc<Inner>, mut event: Event) {
// Alter the token so that the selector will identify the IOCP event as
// one for a named pipe. This will be reversed in `event_done`
//
// `data` for named pipes is an auto-incrementing counter. Because
// `data` is `u64` we do not risk losing the most-significant bit
// (unless a user creates 2^62 named pipes during the lifetime of the
// process).
event.data <<= 1;
event.data += 1;

let completion_status =
event.to_completion_status_with_overlapped(me.event.as_ptr() as *mut _);

match self.cp.as_ref().unwrap().post(completion_status) {
Ok(_) => {
// Increase the ref count of `Inner` for the completion event.
mem::forget(me.clone());
}
Err(_) => {
// Nothing to do here
}
}
}
Expand Down

0 comments on commit 90d4fe0

Please sign in to comment.