Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting rid of lock().unwrap() #33

Merged
merged 3 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 31 additions & 45 deletions src/raw_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,7 @@ impl<S: Read + Write> RawClient<S> {
// might have already received a response for that id, but we don't know it
// yet. Exiting here forces the calling code to fallback to the sender-receiver
// method, and it should find a message there waiting for it.
if self
.waiting_map
.lock()
.unwrap()
.get(&until_message)
.is_none()
{
if self.waiting_map.lock()?.get(&until_message).is_none() {
return Err(Error::CouldntLockReader);
}
}
Expand All @@ -470,8 +464,7 @@ impl<S: Read + Write> RawClient<S> {
if let Err(e) = reader.read_line(&mut raw_resp) {
let error = Arc::new(e);
for (_, s) in self.waiting_map.lock().unwrap().drain() {
s.send(ChannelMessage::Error(error.clone()))
.expect("Unable to send ChannelMessage::Error");
s.send(ChannelMessage::Error(error.clone()))?;
}
return Err(Error::SharedIOError(error));
}
Expand All @@ -494,15 +487,22 @@ impl<S: Read + Write> RawClient<S> {
);

// Remove ourselves from the "waiting map"
let mut map = self.waiting_map.lock().unwrap();
let mut map = self.waiting_map.lock()?;
map.remove(&resp_id);

// If the map is not empty, we select a random thread to become the
// new reader thread.
if let Some(sender) = map.values().next() {
if let Some(err) = map.values().find_map(|sender| {
sender
.send(ChannelMessage::WakeUp)
.expect("Unable to WakeUp a different thread");
.map_err(|err| {
warn!("Unable to wake up a thread, trying some other");
err
})
.err()
}) {
error!("All the threads has failed, giving up");
return Err(err)?;
}

break Ok(resp);
Expand All @@ -512,11 +512,8 @@ impl<S: Read + Write> RawClient<S> {
// move on
trace!("Reader thread received response for {}", resp_id);

if let Some(sender) = self.waiting_map.lock().unwrap().remove(&resp_id)
{
sender
.send(ChannelMessage::Response(resp))
.expect("Unable to send the response");
if let Some(sender) = self.waiting_map.lock()?.remove(&resp_id) {
sender.send(ChannelMessage::Response(resp))?;
} else {
warn!("Missing listener for {}", resp_id);
}
Expand All @@ -539,9 +536,7 @@ impl<S: Read + Write> RawClient<S> {
// running somewhere.
Err(Error::CouldntLockReader)
}
e @ Err(TryLockError::Poisoned(_)) => e
.map(|_| Ok(serde_json::Value::Null))
.expect("Poisoned reader mutex"), // panic if the reader mutex has been poisoned
Err(TryLockError::Poisoned(e)) => Err(e)?,
};

let resp = resp?;
Expand All @@ -556,13 +551,13 @@ impl<S: Read + Write> RawClient<S> {
// Add our listener to the map before we send the request, to make sure we don't get a
// reply before the receiver is added
let (sender, receiver) = channel();
self.waiting_map.lock().unwrap().insert(req.id, sender);
self.waiting_map.lock()?.insert(req.id, sender);

let mut raw = serde_json::to_vec(&req)?;
trace!("==> {}", String::from_utf8_lossy(&raw));

raw.extend_from_slice(b"\n");
let mut stream = self.stream.lock().unwrap();
let mut stream = self.stream.lock()?;
stream.write_all(&raw)?;
stream.flush()?;
drop(stream); // release the lock
Expand All @@ -574,7 +569,7 @@ impl<S: Read + Write> RawClient<S> {
e @ Err(_) => {
// In case of error our sender could still be left in the map, depending on where
// the error happened. Just in case, try to remove it here
self.waiting_map.lock().unwrap().remove(&req.id);
self.waiting_map.lock()?.remove(&req.id);
return e;
}
};
Expand All @@ -592,22 +587,21 @@ impl<S: Read + Write> RawClient<S> {
match self._reader_thread(Some(req_id)) {
Ok(response) => break Ok(response),
Err(Error::CouldntLockReader) => {
match receiver.recv() {
match receiver.recv()? {
// Received our response, returning it
Ok(ChannelMessage::Response(received)) => break Ok(received),
Ok(ChannelMessage::WakeUp) => {
ChannelMessage::Response(received) => break Ok(received),
ChannelMessage::WakeUp => {
// We have been woken up, this means that we should try becoming the
// reader thread ourselves
trace!("WakeUp for {}", req_id);

continue;
}
Ok(ChannelMessage::Error(e)) => {
ChannelMessage::Error(e) => {
warn!("Received ChannelMessage::Error");

break Err(Error::SharedIOError(e));
}
e @ Err(_) => e.map(|_| ()).expect("Error receiving from channel"), // panic if there's something wrong with the channels
}
}
e @ Err(_) => break e,
Expand All @@ -617,14 +611,14 @@ impl<S: Read + Write> RawClient<S> {

fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
match method {
"blockchain.headers.subscribe" => self.headers.lock().unwrap().append(
"blockchain.headers.subscribe" => self.headers.lock()?.append(
&mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
.into_iter()
.collect(),
),
"blockchain.scripthash.subscribe" => {
let unserialized: ScriptNotification = serde_json::from_value(result)?;
let mut script_notifications = self.script_notifications.lock().unwrap();
let mut script_notifications = self.script_notifications.lock()?;

let queue = script_notifications
.get_mut(&unserialized.scripthash)
Expand Down Expand Up @@ -668,10 +662,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
);
missing_responses.insert(req.id);

self.waiting_map
.lock()
.unwrap()
.insert(req.id, sender.clone());
self.waiting_map.lock()?.insert(req.id, sender.clone());

raw.append(&mut serde_json::to_vec(&req)?);
raw.extend_from_slice(b"\n");
Expand All @@ -683,7 +674,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {

trace!("==> {}", String::from_utf8_lossy(&raw));

let mut stream = self.stream.lock().unwrap();
let mut stream = self.stream.lock()?;
stream.write_all(&raw)?;
stream.flush()?;
drop(stream); // release the lock
Expand All @@ -699,7 +690,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
// the error happened. Just in case, try to remove it here
warn!("got error for req_id {}: {:?}", req_id, e);
warn!("removing all waiting req of this batch");
let mut guard = self.waiting_map.lock().unwrap();
let mut guard = self.waiting_map.lock()?;
for req_id in missing_responses.iter() {
guard.remove(&req_id);
}
Expand Down Expand Up @@ -730,7 +721,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
}

fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
Ok(self.headers.lock().unwrap().pop_front())
Ok(self.headers.lock()?.pop_front())
}

fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
Expand Down Expand Up @@ -796,7 +787,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {

fn script_subscribe(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
let script_hash = script.to_electrum_scripthash();
let mut script_notifications = self.script_notifications.lock().unwrap();
let mut script_notifications = self.script_notifications.lock()?;

if script_notifications.contains_key(&script_hash) {
return Err(Error::AlreadySubscribed(script_hash));
Expand All @@ -816,7 +807,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {

fn script_unsubscribe(&self, script: &Script) -> Result<bool, Error> {
let script_hash = script.to_electrum_scripthash();
let mut script_notifications = self.script_notifications.lock().unwrap();
let mut script_notifications = self.script_notifications.lock()?;

if !script_notifications.contains_key(&script_hash) {
return Err(Error::NotSubscribed(script_hash));
Expand All @@ -838,12 +829,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
fn script_pop(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
let script_hash = script.to_electrum_scripthash();

match self
.script_notifications
.lock()
.unwrap()
.get_mut(&script_hash)
{
match self.script_notifications.lock()?.get_mut(&script_hash) {
None => Err(Error::NotSubscribed(script_hash)),
Some(queue) => Ok(queue.pop_front()),
}
Expand Down
25 changes: 22 additions & 3 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use log::error;
use std::io::{self, Read, Write};
use std::sync::{Arc, Mutex};

Expand All @@ -6,17 +7,35 @@ pub struct ClonableStream<T: Read + Write>(Arc<Mutex<T>>);

impl<T: Read + Write> Read for ClonableStream<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.lock().unwrap().read(buf)
self.0
.lock()
.map_err(|_| {
error!("Unable to acquire lock on ClonableStream read operation");
io::Error::from(io::ErrorKind::BrokenPipe)
})?
.read(buf)
}
}

impl<T: Read + Write> Write for ClonableStream<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.lock().unwrap().write(buf)
self.0
.lock()
.map_err(|_| {
error!("Unable to acquire lock on ClonableStream write operation");
io::Error::from(io::ErrorKind::BrokenPipe)
})?
.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.0.lock().unwrap().flush()
self.0
.lock()
.map_err(|_| {
error!("Unable to acquire lock on ClonableStream flush operation");
io::Error::from(io::ErrorKind::BrokenPipe)
})?
.flush()
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ pub enum Error {
/// Couldn't take a lock on the reader mutex. This means that there's already another reader
/// thread running
CouldntLockReader,
/// Broken IPC communication channel: the other thread probably has exited
Mpsc,

#[cfg(feature = "use-openssl")]
/// Invalid OpenSSL method used
Expand Down Expand Up @@ -342,6 +344,7 @@ impl Display for Error {
Error::MissingDomain => f.write_str("Missing domain while it was explicitly asked to validate it"),
Error::BothSocksAndTimeout => f.write_str("Setting both a proxy and a timeout in `Config` is an error"),
Error::CouldntLockReader => f.write_str("Couldn't take a lock on the reader mutex. This means that there's already another reader thread is running"),
Error::Mpsc => f.write_str("Broken IPC communication channel: the other thread probably has exited"),
}
}
}
Expand All @@ -362,3 +365,21 @@ impl_error!(std::io::Error, IOError);
impl_error!(serde_json::Error, JSON);
impl_error!(bitcoin::hashes::hex::Error, Hex);
impl_error!(bitcoin::consensus::encode::Error, Bitcoin);

impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(_: std::sync::PoisonError<T>) -> Self {
Error::IOError(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
}

impl<T> From<std::sync::mpsc::SendError<T>> for Error {
fn from(_: std::sync::mpsc::SendError<T>) -> Self {
Error::Mpsc
}
}

impl From<std::sync::mpsc::RecvError> for Error {
fn from(_: std::sync::mpsc::RecvError) -> Self {
Error::Mpsc
}
}