Skip to content

Commit

Permalink
Merge pull request #2141 from TheBlueMatt/2023-03-fuck-rust
Browse files Browse the repository at this point in the history
Drop `futures` dependency from `lightning-block-sync`
  • Loading branch information
wpaulino authored Mar 31, 2023
2 parents 783e818 + 491100d commit 0e28bcb
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 21 deletions.
3 changes: 1 addition & 2 deletions lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[features]
futures = [ "futures-util" ]
futures = [ ]
std = ["lightning/std", "lightning-rapid-gossip-sync/std"]

default = ["std"]
Expand All @@ -23,7 +23,6 @@ default = ["std"]
bitcoin = { version = "0.29.0", default-features = false }
lightning = { version = "0.0.114", path = "../lightning", default-features = false }
lightning-rapid-gossip-sync = { version = "0.0.114", path = "../lightning-rapid-gossip-sync", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"], optional = true }

[dev-dependencies]
lightning = { version = "0.0.114", path = "../lightning", features = ["_test_utils"] }
Expand Down
60 changes: 53 additions & 7 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#![deny(private_intra_doc_links)]

#![deny(missing_docs)]
#![deny(unsafe_code)]
#![cfg_attr(not(feature = "futures"), deny(unsafe_code))]

#![cfg_attr(docsrs, feature(doc_auto_cfg))]

Expand Down Expand Up @@ -52,8 +52,6 @@ use std::thread::{self, JoinHandle};
#[cfg(feature = "std")]
use std::time::Instant;

#[cfg(feature = "futures")]
use futures_util::{select_biased, future::FutureExt, task};
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;

Expand Down Expand Up @@ -384,6 +382,50 @@ macro_rules! define_run_body {
} }
}

#[cfg(feature = "futures")]
pub(crate) mod futures_util {
use core::future::Future;
use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
use core::pin::Pin;
use core::marker::Unpin;
pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
pub a: A,
pub b: B,
}
pub(crate) enum SelectorOutput {
A, B(bool),
}

impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
type Output = SelectorOutput;
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
match Pin::new(&mut self.a).poll(ctx) {
Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
Poll::Pending => {},
}
match Pin::new(&mut self.b).poll(ctx) {
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
Poll::Pending => {},
}
Poll::Pending
}
}

// If we want to poll a future without an async context to figure out if it has completed or
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
// but sadly there's a good bit of boilerplate here.
fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
fn dummy_waker_action(_: *const ()) { }

const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
}
#[cfg(feature = "futures")]
use futures_util::{Selector, SelectorOutput, dummy_waker};
#[cfg(feature = "futures")]
use core::task;

/// Processes background events in a future.
///
/// `sleeper` should return a future which completes in the given amount of time and returns a
Expand Down Expand Up @@ -470,16 +512,20 @@ where
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
gossip_sync, peer_manager, logger, scorer, should_break, {
select_biased! {
_ = channel_manager.get_persistable_update_future().fuse() => true,
exit = sleeper(Duration::from_millis(100)).fuse() => {
let fut = Selector {
a: channel_manager.get_persistable_update_future(),
b: sleeper(Duration::from_millis(100)),
};
match fut.await {
SelectorOutput::A => true,
SelectorOutput::B(exit) => {
should_break = exit;
false
}
}
}, |t| sleeper(Duration::from_secs(t)),
|fut: &mut SleepFuture, _| {
let mut waker = task::noop_waker();
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
})
Expand Down
1 change: 0 additions & 1 deletion lightning-block-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ rpc-client = [ "serde_json", "chunked_transfer" ]
[dependencies]
bitcoin = "0.29.0"
lightning = { version = "0.0.114", path = "../lightning" }
futures-util = { version = "0.3" }
tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
serde_json = { version = "1.0", optional = true }
chunked_transfer = { version = "1.4", optional = true }
Expand Down
14 changes: 8 additions & 6 deletions lightning-block-sync/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,34 @@ use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse};
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;

use futures_util::lock::Mutex;

use std::convert::TryFrom;
use std::convert::TryInto;
use std::sync::Mutex;

/// A simple REST client for requesting resources using HTTP `GET`.
pub struct RestClient {
endpoint: HttpEndpoint,
client: Mutex<HttpClient>,
client: Mutex<Option<HttpClient>>,
}

impl RestClient {
/// Creates a new REST client connected to the given endpoint.
///
/// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest).
pub fn new(endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self { endpoint, client })
Ok(Self { endpoint, client: Mutex::new(None) })
}

/// Requests a resource encoded in `F` format and interpreted as type `T`.
pub async fn request_resource<F, T>(&self, resource_path: &str) -> std::io::Result<T>
where F: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path);
self.client.lock().await.get::<F>(&uri, &host).await?.try_into()
let mut client = if let Some(client) = self.client.lock().unwrap().take() { client }
else { HttpClient::connect(&self.endpoint)? };
let res = client.get::<F>(&uri, &host).await?.try_into();
*self.client.lock().unwrap() = Some(client);
res
}
}

Expand Down
14 changes: 9 additions & 5 deletions lightning-block-sync/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;

use futures_util::lock::Mutex;
use std::sync::Mutex;

use serde_json;

Expand Down Expand Up @@ -41,7 +41,7 @@ impl Error for RpcError {}
pub struct RpcClient {
basic_auth: String,
endpoint: HttpEndpoint,
client: Mutex<HttpClient>,
client: Mutex<Option<HttpClient>>,
id: AtomicUsize,
}

Expand All @@ -50,11 +50,10 @@ impl RpcClient {
/// credentials should be a base64 encoding of a user name and password joined by a colon, as is
/// required for HTTP basic access authentication.
pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self {
basic_auth: "Basic ".to_string() + credentials,
endpoint,
client,
client: Mutex::new(None),
id: AtomicUsize::new(0),
})
}
Expand All @@ -73,7 +72,12 @@ impl RpcClient {
"id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
});

let mut response = match self.client.lock().await.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await {
let mut client = if let Some(client) = self.client.lock().unwrap().take() { client }
else { HttpClient::connect(&self.endpoint)? };
let http_response = client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await;
*self.client.lock().unwrap() = Some(client);

let mut response = match http_response {
Ok(JsonResponse(response)) => response,
Err(e) if e.kind() == std::io::ErrorKind::Other => {
match e.get_ref().unwrap().downcast_ref::<HttpError>() {
Expand Down

0 comments on commit 0e28bcb

Please sign in to comment.