Skip to content

Commit

Permalink
Revert "Drop dep tokio's io-util feat as it broke MSRV and isn't …
Browse files Browse the repository at this point in the history
…useful"

This reverts commit eb882a6.
  • Loading branch information
luckysori committed Dec 8, 2023
1 parent 08c0c5f commit 121bc32
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 35 deletions.
4 changes: 2 additions & 2 deletions lightning-net-tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
bitcoin = "0.29.0"
lightning = { version = "0.0.117", path = "../lightning" }
tokio = { version = "1.0", features = [ "rt", "sync", "net", "time" ] }
tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] }

[dev-dependencies]
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
lightning = { version = "0.0.117", path = "../lightning", features = ["_test_utils"] }
72 changes: 39 additions & 33 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
use bitcoin::secp256k1::PublicKey;

use tokio::net::TcpStream;
use tokio::time;
use tokio::{io, time};
use tokio::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};

use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
Expand All @@ -58,7 +59,7 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
// define a trivial two- and three- select macro with the specific types we need and just use that.

pub(crate) enum SelectorOutput {
A(Option<()>), B(Option<()>), C(tokio::io::Result<()>),
A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
}

pub(crate) struct TwoSelector<
Expand Down Expand Up @@ -86,15 +87,15 @@ impl<
}

pub(crate) struct ThreeSelector<
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
> {
pub a: A,
pub b: B,
pub c: C,
}

impl<
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
> Future for ThreeSelector<A, B, C> {
type Output = SelectorOutput;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
Expand All @@ -118,7 +119,7 @@ impl<
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
/// read future (which is returned by schedule_read).
struct Connection {
writer: Option<Arc<TcpStream>>,
writer: Option<io::WriteHalf<TcpStream>>,
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
Expand Down Expand Up @@ -155,7 +156,7 @@ impl Connection {
async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
peer_manager: PM,
us: Arc<Mutex<Self>>,
reader: Arc<TcpStream>,
mut reader: io::ReadHalf<TcpStream>,
mut read_wake_receiver: mpsc::Receiver<()>,
mut write_avail_receiver: mpsc::Receiver<()>,
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl Connection {
ThreeSelector {
a: Box::pin(write_avail_receiver.recv()),
b: Box::pin(read_wake_receiver.recv()),
c: Box::pin(reader.readable()),
c: Box::pin(reader.read(&mut buf)),
}.await
};
match select_result {
Expand All @@ -210,9 +211,8 @@ impl Connection {
}
},
SelectorOutput::B(_) => {},
SelectorOutput::C(res) => {
if res.is_err() { break Disconnect::PeerDisconnected; }
match reader.try_read(&mut buf) {
SelectorOutput::C(read) => {
match read {
Ok(0) => break Disconnect::PeerDisconnected,
Ok(len) => {
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
Expand All @@ -226,10 +226,6 @@ impl Connection {
Err(_) => break Disconnect::CloseConnection,
}
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// readable() is allowed to spuriously wake, so we have to handle
// WouldBlock here.
},
Err(_) => break Disconnect::PeerDisconnected,
}
},
Expand All @@ -243,14 +239,18 @@ impl Connection {
// here.
let _ = tokio::task::yield_now().await;
};
us.lock().unwrap().writer.take();
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
// If the socket is already closed, shutdown() will fail, so just ignore it.
let _ = writer.shutdown().await;
}
if let Disconnect::PeerDisconnected = disconnect_type {
peer_manager.as_ref().socket_disconnected(&our_descriptor);
peer_manager.as_ref().process_events();
}
}

fn new(stream: StdTcpStream) -> (Arc<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
// We only ever need a channel of depth 1 here: if we returned a non-full write to the
// PeerManager, we will eventually get notified that there is room in the socket to write
// new bytes, which will generate an event. That event will be popped off the queue before
Expand All @@ -262,11 +262,11 @@ impl Connection {
// false.
let (read_waker, read_receiver) = mpsc::channel(1);
stream.set_nonblocking(true).unwrap();
let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap());
let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap());

(Arc::clone(&tokio_stream), write_receiver, read_receiver,
(reader, write_receiver, read_receiver,
Arc::new(Mutex::new(Self {
writer: Some(tokio_stream), write_avail, read_waker, read_paused: false,
writer: Some(writer), write_avail, read_waker, read_paused: false,
rl_requested_disconnect: false,
id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
})))
Expand Down Expand Up @@ -462,9 +462,9 @@ impl SocketDescriptor {
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
// To send data, we take a lock on our Connection to access the TcpStream, writing to it if
// there's room in the kernel buffer, or otherwise create a new Waker with a
// SocketDescriptor in it which can wake up the write_avail Sender, waking up the
// To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
// writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
// a SocketDescriptor in it which can wake up the write_avail Sender, waking up the
// processing future which will call write_buffer_space_avail and we'll end up back here.
let mut us = self.conn.lock().unwrap();
if us.writer.is_none() {
Expand All @@ -484,18 +484,24 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
let mut ctx = task::Context::from_waker(&waker);
let mut written_len = 0;
loop {
match us.writer.as_ref().unwrap().poll_write_ready(&mut ctx) {
task::Poll::Ready(Ok(())) => {
match us.writer.as_ref().unwrap().try_write(&data[written_len..]) {
Ok(res) => {
debug_assert_ne!(res, 0);
written_len += res;
if written_len == data.len() { return written_len; }
},
Err(_) => return written_len,
}
match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
task::Poll::Ready(Ok(res)) => {
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
assert_ne!(res, 0);
written_len += res;
if written_len == data.len() { return written_len; }
},
task::Poll::Ready(Err(e)) => {
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
assert_ne!(e.kind(), io::ErrorKind::WouldBlock);
// Probably we've already been closed, just return what we have and let the
// read thread handle closing logic.
return written_len;
},
task::Poll::Ready(Err(_)) => return written_len,
task::Poll::Pending => {
// We're queued up for a write event now, but we need to make sure we also
// pause read given we're now waiting on the remote end to ACK (and in
Expand Down

0 comments on commit 121bc32

Please sign in to comment.