Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to A10 v0.2 #635

Merged
merged 6 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions http/src/head/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,8 +1018,7 @@ macro_rules! int_impl {
Some(v) => value = v,
None => return Err(ParseIntError),
}
#[allow(trivial_numeric_casts)] // For `u8 as u8`.
match value.checked_add((b - b'0') as $ty) {
match value.checked_add(<$ty>::from(b - b'0')) {
Some(v) => value = v,
None => return Err(ParseIntError),
}
Expand Down
8 changes: 4 additions & 4 deletions http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,22 +696,22 @@ impl Connection {
}

/// See [`TcpStream::peer_addr`].
pub fn peer_addr(&mut self) -> io::Result<SocketAddr> {
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.stream.peer_addr()
}

/// See [`TcpStream::local_addr`].
pub fn local_addr(&mut self) -> io::Result<SocketAddr> {
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.stream.local_addr()
}

/// See [`TcpStream::set_nodelay`].
pub fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
self.stream.set_nodelay(nodelay)
}

/// See [`TcpStream::nodelay`].
pub fn nodelay(&mut self) -> io::Result<bool> {
pub fn nodelay(&self) -> io::Result<bool> {
self.stream.nodelay()
}

Expand Down
2 changes: 0 additions & 2 deletions http/tests/functional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#![feature(async_iterator, never_type)]

use std::mem::size_of;

#[track_caller]
fn assert_size<T>(expected: usize) {
assert_eq!(size_of::<T>(), expected);
Expand Down
5 changes: 5 additions & 0 deletions inbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ impl Error for RecvError {}

impl<T> Receiver<T> {
/// Attempts to receive a value from this channel.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn try_recv(&mut self) -> Result<T, RecvError> {
try_recv(self.channel())
}
Expand All @@ -663,13 +664,15 @@ impl<T> Receiver<T> {
/// [`Poll::Pending`] instead.
///
/// [disconnected]: Receiver::is_connected
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn recv(&mut self) -> RecvValue<T> {
RecvValue {
channel: self.channel(),
}
}

/// Attempts to peek a value from this channel.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn try_peek(&mut self) -> Result<&T, RecvError> {
try_peek(self.channel())
}
Expand All @@ -683,6 +686,7 @@ impl<T> Receiver<T> {
/// [`Poll::Pending`] instead.
///
/// [disconnected]: Receiver::is_connected
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn peek(&mut self) -> PeekValue<T> {
PeekValue {
channel: self.channel(),
Expand Down Expand Up @@ -743,6 +747,7 @@ impl<T> Receiver<T> {
///
/// This is useful if you can't call [`Receiver::recv`] but still want a
/// wake-up notification once messages are added to the inbox.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn register_waker(&mut self, waker: &task::Waker) -> bool {
self.channel().receiver_waker.register(waker)
}
Expand Down
3 changes: 3 additions & 0 deletions inbox/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl<T> Receiver<T> {
///
/// If it succeeds it returns the value and resets the channel, returning a
/// new [`Sender`] (which can send a value to this `Receiver`).
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn try_recv(&mut self) -> Result<T, RecvError> {
let shared = self.shared();
// SAFETY: `AcqRel` is required here to ensure it syncs with
Expand Down Expand Up @@ -274,6 +275,7 @@ impl<T> Receiver<T> {
/// # Notes
///
/// If the channel contains a value it will be dropped.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn try_reset(&mut self) -> Option<Sender<T>> {
let shared = self.shared();
// SAFETY: `Acquire` is required here to ensure it syncs with
Expand Down Expand Up @@ -317,6 +319,7 @@ impl<T> Receiver<T> {
///
/// This is useful if you can't call [`Receiver::recv`] but still want a
/// wake-up notification once messages are added to the inbox.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn register_waker(&mut self, waker: &task::Waker) -> bool {
let shared = self.shared();
let mut receiver_waker = shared.receiver_waker.lock().unwrap();
Expand Down
1 change: 0 additions & 1 deletion inbox/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Tests for the internal API.

use std::future::Future;
use std::mem::{size_of, size_of_val};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{self, Poll, Wake};
Expand Down
2 changes: 1 addition & 1 deletion rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ default = []
test = ["heph/test"]

[dependencies]
a10 = { version = "0.1.9", default-features = false, features = ["nightly"] }
a10 = { version = "0.2.0", default-features = false, features = ["nightly"] }
heph = { version = "0.5.0", path = "../", default-features = false }
heph-inbox = { version = "0.2.3", path = "../inbox", default-features = false }
log = { version = "0.4.21", default-features = false, features = ["kv_std"] }
Expand Down
2 changes: 2 additions & 0 deletions rt/src/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl ThreadSafe {
/// Spawn a thread-safe [`Future`].
///
/// See [`RuntimeRef::spawn_future`] for more documentation.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
Expand Down Expand Up @@ -414,6 +415,7 @@ impl Sync {
/// Spawn a thread-safe [`Future`].
///
/// See [`RuntimeRef::spawn_future`] for more documentation.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
Expand Down
6 changes: 3 additions & 3 deletions rt/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::pin::Pin;
use std::sync::mpsc;
use std::task::{self, Poll};

use a10::msg::{MsgListener, MsgToken};
use a10::msg::{msg_listener, try_send_msg, MsgListener, MsgToken};

use crate::wakers::no_ring_ctx;

Expand All @@ -20,7 +20,7 @@ const WAKE: u32 = u32::from_ne_bytes([b'W', b'A', b'K', b'E']); // 1162559831.
///
/// The `sq` will be used to wake up the receiving end when sending.
pub(crate) fn new<T>(sq: a10::SubmissionQueue) -> io::Result<(Sender<T>, Receiver<T>)> {
let (listener, token) = sq.clone().msg_listener()?;
let (listener, token) = msg_listener(sq.clone())?;
let (sender, receiver) = mpsc::channel();
let sender = Sender { sender, sq, token };
let receiver = Receiver { receiver, listener };
Expand All @@ -43,7 +43,7 @@ impl<T> Sender<T> {
self.sender
.send(msg)
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "receiver closed channel"))?;
self.sq.try_send_msg(self.token, WAKE)
try_send_msg(&self.sq, self.token, WAKE)
}
}

Expand Down
2 changes: 1 addition & 1 deletion rt/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::task::{self, Poll};
use std::time::{Duration, Instant};
use std::{fmt, io, process};

use a10::signals::{ReceiveSignals, Signals};
use a10::process::{ReceiveSignals, Signals};
use heph::actor_ref::ActorGroup;
use log::{debug, error, info, trace};

Expand Down
4 changes: 2 additions & 2 deletions rt/src/fs/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::ffi::{CString, OsStr, OsString};
use std::mem::{size_of, take};
use std::mem::take;
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::path::{Path, PathBuf};
use std::{fmt, io, ptr};
Expand Down Expand Up @@ -269,7 +269,7 @@ impl<'w> Iterator for Events<'w> {
// it should be valid.
debug_assert!(self.buf.len() >= self.processed);
let event: &'w Event =
unsafe { &*ptr::from_raw_parts(ptr::from_ref(event).cast(), event.len as usize) };
unsafe { &*ptr::from_raw_parts(ptr::from_ref(event), event.len as usize) };

Some(event)
}
Expand Down
6 changes: 5 additions & 1 deletion rt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ impl Runtime {
/// Spawn a thread-safe [`Future`].
///
/// See [`RuntimeRef::spawn_future`] for more documentation.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
Expand Down Expand Up @@ -591,7 +592,7 @@ impl RuntimeRef {
/// Similar to thread-local actors this will only run on a single thread.
/// See the discussion of thread-local vs. thread-safe actors in the
/// [`spawn`] module for additional information.
#[allow(clippy::needless_pass_by_value)]
#[allow(clippy::needless_pass_by_value, clippy::needless_pass_by_ref_mut)]
pub fn spawn_local_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + 'static,
Expand All @@ -611,6 +612,7 @@ impl RuntimeRef {
/// Similar to thread-safe actors this can run on any of the workers
/// threads. See the discussion of thread-local vs. thread-safe actors in
/// the [`spawn`] module for additional information.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
Expand All @@ -624,6 +626,7 @@ impl RuntimeRef {
/// receive a process signal.
///
/// [process signals]: Signal
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn receive_signals(&mut self, actor_ref: ActorRef<Signal>) {
self.internals
.signal_receivers
Expand Down Expand Up @@ -656,6 +659,7 @@ impl RuntimeRef {
trace::start(&*self.internals.trace_log.borrow())
}

#[allow(clippy::needless_pass_by_ref_mut)]
fn finish_trace(
&mut self,
substream_id: u64,
Expand Down
2 changes: 1 addition & 1 deletion rt/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! [Unix listening socket]: crate::net::UnixListener
//! [Unix datagram socket]: crate::net::UnixDatagram

use std::mem::{size_of, MaybeUninit};
use std::mem::MaybeUninit;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::{fmt, io, ptr};

Expand Down
2 changes: 1 addition & 1 deletion rt/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl TcpListener {
/// The CPU affinity is **not** set on the returned TCP stream. To set that
/// use [`TcpStream::set_auto_cpu_affinity`].
#[allow(clippy::doc_markdown)] // For "io_uring".
pub const fn incoming(&self) -> Incoming<'_> {
pub fn incoming(&self) -> Incoming<'_> {
Incoming(self.fd.multishot_accept())
}

Expand Down
2 changes: 1 addition & 1 deletion rt/src/net/uds/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl UnixListener {
/// The CPU affinity is **not** set on the returned Unix stream. To set that
/// use [`UnixStream::set_auto_cpu_affinity`].
#[allow(clippy::doc_markdown)] // For "io_uring".
pub const fn incoming(&self) -> Incoming<'_> {
pub fn incoming(&self) -> Incoming<'_> {
Incoming(self.fd.multishot_accept())
}

Expand Down
2 changes: 1 addition & 1 deletion rt/src/net/uds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! * [`UnixStream`] represents a Unix stream socket.
//! * [`UnixDatagram`] represents a Unix datagram socket.

use std::mem::{size_of, MaybeUninit};
use std::mem::MaybeUninit;
use std::path::Path;
use std::{io, ptr};

Expand Down
1 change: 0 additions & 1 deletion rt/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::cmp::Ordering;
use std::future::Future;
use std::mem::size_of_val;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::pin::Pin;
use std::task::{self, Poll};
Expand Down
2 changes: 1 addition & 1 deletion rt/src/scheduler/inactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ const fn skip_bits(pid: ProcessId, depth: usize) -> usize {
mod tests {
use std::cmp::max;
use std::future::Future;
use std::mem::{align_of, size_of};
use std::mem::align_of;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion rt/src/scheduler/shared/inactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ unsafe fn drop_tagged_pointer(ptr: TaggedPointer) {
#[cfg(test)]
mod tests {
use std::future::Future;
use std::mem::{align_of, size_of};
use std::mem::align_of;
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
1 change: 0 additions & 1 deletion rt/src/scheduler/shared/runqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ impl Node {
#[cfg(test)]
mod tests {
use std::future::Future;
use std::mem::size_of;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
Expand Down
3 changes: 1 addition & 2 deletions rt/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,7 @@ fn cpu_set(cpu: usize) -> libc::cpu_set_t {
#[cfg(target_os = "linux")]
fn set_affinity(cpu_set: &libc::cpu_set_t) -> io::Result<()> {
let thread = unsafe { libc::pthread_self() };
let res =
unsafe { libc::pthread_setaffinity_np(thread, std::mem::size_of_val(cpu_set), cpu_set) };
let res = unsafe { libc::pthread_setaffinity_np(thread, size_of_val(cpu_set), cpu_set) };
if res == 0 {
Ok(())
} else {
Expand Down
2 changes: 1 addition & 1 deletion rt/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ impl<RT> NewActor for TestAssertUnmovedNewActor<RT> {
#[cfg(test)]
#[track_caller]
pub(crate) fn assert_size<T>(expected: usize) {
assert_eq!(std::mem::size_of::<T>(), expected);
assert_eq!(size_of::<T>(), expected);
}

/// Supervisor to use in testing.
Expand Down
5 changes: 3 additions & 2 deletions rt/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl CoordinatorLog {
}

/// Returns the next stream counter.
#[allow(clippy::needless_pass_by_ref_mut)]
fn next_stream_count(&mut self) -> u32 {
// SAFETY: needs to sync with itself.
self.shared.counter.fetch_add(1, atomic::Ordering::AcqRel)
Expand Down Expand Up @@ -617,7 +618,7 @@ mod private {
}

fn write_attribute(&self, buf: &mut Vec<u8>) {
#[allow(trivial_numeric_casts)] // for `u64 as u64`, etc.
#[allow(trivial_numeric_casts, clippy::cast_lossless)] // for `u64 as u64`, etc.
let value = self.get() as $f_ty;
buf.extend_from_slice(&value.to_be_bytes());
}
Expand All @@ -632,7 +633,7 @@ mod private {
}

fn write_attribute(&self, buf: &mut Vec<u8>) {
#[allow(trivial_numeric_casts)] // for `u64 as u64`, etc.
#[allow(trivial_numeric_casts, clippy::cast_lossless)] // for `u64 as u64`, etc.
let value = *self as $f_ty;
buf.extend_from_slice(&value.to_be_bytes());
}
Expand Down
Loading