Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate JSON-RPC errors through the Rust subscription #929

Merged
merged 11 commits into from
Jul 19, 2021
2 changes: 2 additions & 0 deletions .changelog/unreleased/bug-fixes/930-newblock-event-parse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Fix bug where `NewBlock` events emitted by Tendermint could not be parsed because of a missing field ([#930](https://github.com/informalsystems/tendermint-rs/issues/930))
romac marked this conversation as resolved.
Show resolved Hide resolved

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `[tendermint-rpc]` Propagate JSON-RPC errors through the Rust subscription ([#932](https://github.com/informalsystems/tendermint-rs/issues/932))
6 changes: 3 additions & 3 deletions rpc/src/client/transport/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl MockClientDriver {
DriverCommand::Unsubscribe { query, result_tx } => {
self.unsubscribe(query, result_tx);
}
DriverCommand::Publish(event) => self.publish(event.as_ref()),
DriverCommand::Publish(event) => self.publish(*event),
DriverCommand::Terminate => return Ok(()),
}
}
Expand All @@ -179,8 +179,8 @@ impl MockClientDriver {
result_tx.send(Ok(())).unwrap();
}

fn publish(&mut self, event: &Event) {
self.router.publish(event);
fn publish(&mut self, event: Event) {
self.router.publish_event(event);
}
}

Expand Down
79 changes: 60 additions & 19 deletions rpc/src/client/transport/router.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,91 @@
//! Event routing for subscriptions.

use crate::client::subscription::SubscriptionTx;
use crate::event::Event;
use std::borrow::BorrowMut;
use std::collections::{HashMap, HashSet};

use tracing::debug;

use crate::client::subscription::SubscriptionTx;
use crate::error::Error;
use crate::event::Event;

pub type SubscriptionQuery = String;
pub type SubscriptionId = String;

#[cfg_attr(not(feature = "websocket"), allow(dead_code))]
pub type SubscriptionIdRef<'a> = &'a str;

/// Provides a mechanism for tracking [`Subscription`]s and routing [`Event`]s
/// to those subscriptions.
///
/// [`Subscription`]: struct.Subscription.html
/// [`Event`]: ./event/struct.Event.html
#[derive(Debug)]
pub struct SubscriptionRouter {
// A map of subscription queries to collections of subscription IDs and
// their result channels. Used for publishing events relating to a specific
// query.
subscriptions: HashMap<String, HashMap<String, SubscriptionTx>>,
/// A map of subscription queries to collections of subscription IDs and
/// their result channels. Used for publishing events relating to a specific
/// query.
subscriptions: HashMap<SubscriptionQuery, HashMap<SubscriptionId, SubscriptionTx>>,
}

impl SubscriptionRouter {
/// Publishes the given error to all of the subscriptions to which the
/// error is relevant, based on the given subscription id query.
#[cfg_attr(not(feature = "websocket"), allow(dead_code))]
pub fn publish_error(&mut self, id: SubscriptionIdRef<'_>, err: Error) -> PublishResult {
if let Some(query) = self.subscription_query(id).cloned() {
self.publish(query, Err(err))
} else {
PublishResult::NoSubscribers
}
}

/// Get the query associated with the given subscription.
#[cfg_attr(not(feature = "websocket"), allow(dead_code))]
fn subscription_query(&self, id: SubscriptionIdRef<'_>) -> Option<&SubscriptionQuery> {
for (query, subs) in &self.subscriptions {
if subs.contains_key(id) {
return Some(query);
}
}

None
}

/// Publishes the given event to all of the subscriptions to which the
/// event is relevant. At present, it matches purely based on the query
/// associated with the event, and only queries that exactly match that of
/// the event's.
pub fn publish(&mut self, ev: &Event) -> PublishResult {
let subs_for_query = match self.subscriptions.get_mut(&ev.query) {
/// event is relevant, based on the associated query.
#[cfg_attr(not(feature = "websocket"), allow(dead_code))]
pub fn publish_event(&mut self, ev: Event) -> PublishResult {
self.publish(ev.query.clone(), Ok(ev))
}

/// Publishes the given event/error to all of the subscriptions to which the
/// event/error is relevant, based on the given query.
pub fn publish(&mut self, query: SubscriptionQuery, ev: Result<Event, Error>) -> PublishResult {
let subs_for_query = match self.subscriptions.get_mut(&query) {
Some(s) => s,
None => return PublishResult::NoSubscribers,
};

// We assume here that any failure to publish an event is an indication
// that the receiver end of the channel has been dropped, which allows
// us to safely stop tracking the subscription.
let mut disconnected = HashSet::new();
for (id, event_tx) in subs_for_query.borrow_mut() {
if let Err(e) = event_tx.send(Ok(ev.clone())) {
for (id, event_tx) in subs_for_query.iter_mut() {
if let Err(e) = event_tx.send(ev.clone()) {
disconnected.insert(id.clone());
debug!(
"Automatically disconnecting subscription with ID {} for query \"{}\" due to failure to publish to it: {}",
id, ev.query, e
id, query, e
);
}
}

for id in disconnected {
subs_for_query.remove(&id);
}

if subs_for_query.is_empty() {
PublishResult::AllDisconnected
PublishResult::AllDisconnected(query)
} else {
PublishResult::Success
}
Expand All @@ -63,6 +102,7 @@ impl SubscriptionRouter {
self.subscriptions.get_mut(&query).unwrap()
}
};

subs_for_query.insert(id.to_string(), tx);
}

Expand Down Expand Up @@ -98,7 +138,8 @@ impl Default for SubscriptionRouter {
pub enum PublishResult {
Success,
NoSubscribers,
AllDisconnected,
// All subscriptions for the given query have disconnected.
AllDisconnected(String),
romac marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
Expand Down Expand Up @@ -160,7 +201,7 @@ mod test {

let mut ev = read_event("event_new_block_1").await;
ev.query = "query1".into();
router.publish(&ev);
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
Expand All @@ -169,7 +210,7 @@ mod test {
assert_eq!(ev, subs2_ev);

ev.query = "query2".into();
router.publish(&ev);
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
Expand Down
45 changes: 39 additions & 6 deletions rpc/src/client/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use tendermint::net;
use tokio::time::{Duration, Instant};
use tracing::{debug, error};

use super::router::{SubscriptionId, SubscriptionIdRef};

// WebSocket connection times out if we haven't heard anything at all from the
// server in this long.
//
Expand Down Expand Up @@ -504,7 +506,7 @@ pub struct WebSocketClientDriver {
cmd_rx: ChannelRx<DriverCommand>,
// Commands we've received but have not yet completed, indexed by their ID.
// A Terminate command is executed immediately.
pending_commands: HashMap<String, DriverCommand>,
pending_commands: HashMap<SubscriptionId, DriverCommand>,
}

impl WebSocketClientDriver {
Expand Down Expand Up @@ -650,38 +652,69 @@ impl WebSocketClientDriver {
return Ok(());
}

let wrapper = match serde_json::from_str::<response::Wrapper<GenericJsonResponse>>(&msg) {
let wrapper: response::Wrapper<GenericJsonResponse> = match serde_json::from_str(&msg) {
Ok(w) => w,
Err(e) => {
error!(
"Failed to deserialize incoming message as a JSON-RPC message: {}",
e
);

debug!("JSON-RPC message: {}", msg);

return Ok(());
}
};

debug!("Generic JSON-RPC message: {:?}", wrapper);

let id = wrapper.id().to_string();

if let Some(e) = wrapper.into_error() {
self.publish_error(&id, e).await;
}

if let Some(pending_cmd) = self.pending_commands.remove(&id) {
return self.respond_to_pending_command(pending_cmd, msg).await;
self.respond_to_pending_command(pending_cmd, msg).await?;
};

// We ignore incoming messages whose ID we don't recognize (could be
// relating to a fire-and-forget unsubscribe request - see the
// publish_event() method below).
Ok(())
}

async fn publish_error(&mut self, id: SubscriptionIdRef<'_>, err: Error) {
if let PublishResult::AllDisconnected(query) = self.router.publish_error(id, err) {
debug!(
"All subscribers for query \"{}\" have disconnected. Unsubscribing from query...",
query
);

// If all subscribers have disconnected for this query, we need to
// unsubscribe from it. We issue a fire-and-forget unsubscribe
// message.
if let Err(e) = self
.send_request(Wrapper::new(unsubscribe::Request::new(query)))
.await
{
error!("Failed to send unsubscribe request: {}", e);
}
}
}

async fn publish_event(&mut self, ev: Event) {
if let PublishResult::AllDisconnected = self.router.publish(&ev) {
if let PublishResult::AllDisconnected(query) = self.router.publish_event(ev) {
debug!(
"All subscribers for query \"{}\" have disconnected. Unsubscribing from query...",
ev.query
query
);

// If all subscribers have disconnected for this query, we need to
// unsubscribe from it. We issue a fire-and-forget unsubscribe
// message.
if let Err(e) = self
.send_request(Wrapper::new(unsubscribe::Request::new(ev.query.clone())))
.send_request(Wrapper::new(unsubscribe::Request::new(query)))
.await
{
error!("Failed to send unsubscribe request: {}", e);
Expand Down
5 changes: 5 additions & 0 deletions rpc/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ where
&self.id
}

/// Convert this wrapper into the underlying error, if any
pub fn into_error(self) -> Option<Error> {
self.error
}

/// Convert this wrapper into a result type
pub fn into_result(self) -> Result<R, Error> {
// Ensure we're using a supported RPC version
Expand Down
6 changes: 5 additions & 1 deletion rpc/tests/kvstore_fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,10 @@ fn incoming_fixtures() {
assert_eq!(result.genesis.validators[0].power(), 10);
assert!(result.genesis.validators[0].pub_key.ed25519().is_some());
assert_eq!(result.genesis.validators[0].proposer_priority.value(), 0);
assert_eq!(
result.genesis.consensus_params.block.time_iota_ms,
tendermint::block::Size::default_time_iota_ms(),
);
}
"net_info" => {
let result = endpoint::net_info::Response::from_string(content).unwrap();
Expand Down Expand Up @@ -703,7 +707,7 @@ fn incoming_fixtures() {
app: 1
}
);
assert_eq!(result.node_info.version.to_string(), "v0.34.0");
assert_eq!(result.node_info.version.to_string(), "v0.34.9");
assert!(!result.sync_info.catching_up);
assert_eq!(result.sync_info.latest_app_hash.value(), [0; 8]);
assert!(!result.sync_info.latest_block_hash.is_empty());
Expand Down
4 changes: 2 additions & 2 deletions rpc/tests/kvstore_fixtures/incoming/abci_info.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"id": "8a289a02-d7e4-45ed-a0db-97445ff3825a",
"id": "48e82c4a-4018-433e-9557-c40ac927676f",
"jsonrpc": "2.0",
"result": {
"response": {
"app_version": "1",
"data": "{\"size\":0}",
"last_block_app_hash": "AAAAAAAAAAA=",
"last_block_height": "14",
"last_block_height": "32",
"version": "0.17.0"
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"id": "6e4d4855-631f-4cf9-b341-d6460eab88a3",
"id": "ae5b4026-3be4-46ea-876e-2177b74e79a7",
"jsonrpc": "2.0",
"result": {
"response": {
"code": 0,
"codespace": "",
"height": "26",
"height": "44",
"index": "0",
"info": "",
"key": "dHgw",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"id": "fbd10e8a-a975-4e47-8f9a-97646da4cb38",
"id": "a8a36384-95f9-481f-8fc5-4a820176e869",
"jsonrpc": "2.0",
"result": {
"response": {
"code": 0,
"codespace": "",
"height": "14",
"height": "32",
"index": "0",
"info": "",
"key": "bm9uX2V4aXN0ZW50X2tleQ==",
Expand Down
2 changes: 1 addition & 1 deletion rpc/tests/kvstore_fixtures/incoming/block_at_height_0.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"data": "height must be greater than 0, but got 0",
"message": "Internal error"
},
"id": "8becc1c0-0c44-4b0d-bb95-8f3ff2f9893e",
"id": "c0ba0ff6-0afd-4d18-bc15-0891696d42f2",
"jsonrpc": "2.0"
}
14 changes: 7 additions & 7 deletions rpc/tests/kvstore_fixtures/incoming/block_at_height_1.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"id": "5be87751-4f5a-4e31-b4db-91a2562a92ea",
"id": "0166b641-4967-4b3c-a36a-73ea4e5a737a",
"jsonrpc": "2.0",
"result": {
"block": {
Expand All @@ -25,10 +25,10 @@
},
"last_commit_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
"last_results_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
"next_validators_hash": "518FD4E4CCE99DB15463EA5EBAD3B2BEE1030C31D53065F2FD7C53F8A7BC3E7C",
"proposer_address": "ABAF73F9A1D15E78CA39C1E066E07D3F3B3BAFAF",
"time": "2020-12-21T07:05:55.8988413Z",
"validators_hash": "518FD4E4CCE99DB15463EA5EBAD3B2BEE1030C31D53065F2FD7C53F8A7BC3E7C",
"next_validators_hash": "ADFA3B40824D69EAD7828B9A78D16D80DFA93499D1DB0EC362916AE61182A64D",
"proposer_address": "ABA577531E6D6F4119E7E1E0EE1909B908A8346D",
"time": "2021-07-16T12:16:29.232984022Z",
"validators_hash": "ADFA3B40824D69EAD7828B9A78D16D80DFA93499D1DB0EC362916AE61182A64D",
"version": {
"app": "1",
"block": "11"
Expand All @@ -48,9 +48,9 @@
}
},
"block_id": {
"hash": "F313BFEE2921709CA1DACF5BB55367FE293194F9C5A9827970C88993D97D47DC",
"hash": "44C37753BF31FD4238227E19213F042699F62416D4E5422BBA59095FC4BEE039",
"parts": {
"hash": "693DBA913A007C963FBF674A99BC79B05A4C0A872E4F839B1B1FD2D7D1B588CD",
"hash": "13EEDAF51D61F7FF513FCD1B6387E0A3E306E49E5DCEA43CAFE77FB7B951DD4B",
"total": 1
}
}
Expand Down
Loading