-
Notifications
You must be signed in to change notification settings - Fork 224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc: event subscription management for RPC client #516
Conversation
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
This version of the transport provides a request matching interface, along with an implementation of a request method-based matcher (produces specific fixed responses according to the request's method). Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
This commit adds the bulk of the subscription management machinery as well as a rudimentary integration test (which should be marked as `#[ignore]` ASAP) just to demonstrate obtaining new block events from a Tendermint instance running locally with default settings. Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a first pass, left a bunch of comments regarding structure and style inline.
Generally this shapes up to be quite nice. The main concern I'd like to raise is that it seems a rather big amount of abstractions. There are a wide array of indirections and things that have to be plugged together to make the machinery work, i.e. Manager, Router, Subscription, SubscriptionTransport, EventConnection, EventProducer and so on. Another indicator that it's not yet the entirely correct set of abstractions is if you follow the imports it looks like deeply nested modules import entities from their top-level module. It would be nice to come to a more concise and clear hierarchy. Happy to discuss in more detail what I think could be reduced. Generally it would help to think along the lines of what needs to be injected/configurable for different environments.
rpc/src/client/subscription.rs
Outdated
#[derive(Debug)] | ||
pub struct Subscription { | ||
id: SubscriptionId, | ||
query: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this not be a native Query
type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually had implemented a reasonable-looking query implementation in one of my previous iterations, so I'll bring it back in here. I just wanted some feedback on the overarching architecture first prior to adding in more details.
Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com>
Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com>
Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com>
Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com>
Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com>
Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com>
Thanks @xla! I actually had an idea this morning during our onboarding as to how to refactor this a bit to reduce the number of abstractions. For one thing, right now there are effectively two asynchronous processes (outside of the originating task) facilitating the subscription mechanism:
I've realized I can probably reduce this logic to just one async task (the |
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
The previous iteration of the subscription mechanism relied on two asynchronous tasks to manage subscriptions and route events from the WebSockets connection to their relevant subscribers. This version only uses one async task (the WebSocket driver, which must interact asynchronously with the connection). In addition to this, I tried experimenting with using generics instead of trait objects for the transport layer for the `Client`. This works, but the downside of using generics is that, if you want generalizability at layers higher up, it comes at the cost of decreased readability of the code. Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
After much consideration, I decided to do away with the `Transport` and related traits entirely. It makes more sense, and results in better ability to test, if we abstract the `Client` interface instead. This also allows for greater flexibility when implementing different kinds of transports, while still allowing for lazy/optional instantiation of the subscription mechanism. Additional integration tests are included here, as well as additional documentation. Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
While I work on updating #476, the quickest way to get a feel for the new architecture here is to clone this branch locally and take a look at the docs: cd ./rpc/
cargo doc --all-features --open |
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* Add first draft of ADR-008 Signed-off-by: Thane Thomson <connect@thanethomson.com> * Add comment about WebSockets connection Signed-off-by: Thane Thomson <connect@thanethomson.com> * Rename to "RPC Client" for greater clarity Signed-off-by: Thane Thomson <connect@thanethomson.com> * Update docs/architecture/adr-008-event-subscription.md Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com> * Update to reflect work on #516 Signed-off-by: Thane Thomson <connect@thanethomson.com> * Leave out implementation details (ADR) Signed-off-by: Thane Thomson <connect@thanethomson.com> * Clarify handle/driver concurrency model Signed-off-by: Thane Thomson <connect@thanethomson.com> * Update with latest architecture inspired by #516 Signed-off-by: Thane Thomson <connect@thanethomson.com> * Condense Client trait example for readability Signed-off-by: Thane Thomson <connect@thanethomson.com> * Reduce complexity of subscription interface Signed-off-by: Thane Thomson <connect@thanethomson.com> * Introduce ClosableClient trait Signed-off-by: Thane Thomson <connect@thanethomson.com> * Reduce number of client implementations Signed-off-by: Thane Thomson <connect@thanethomson.com> * Remove SubscriptionRouter from ADR discussion (it is an implementation detail) Signed-off-by: Thane Thomson <connect@thanethomson.com> * Update ERD to reflect current architecture Signed-off-by: Thane Thomson <connect@thanethomson.com> * Add note on bounded vs unbounded channels in subscriptions Signed-off-by: Thane Thomson <connect@thanethomson.com> * Fix link for async drop discussion Signed-off-by: Thane Thomson <connect@thanethomson.com> * Add more structure and explanation to the Subscription section Signed-off-by: Thane Thomson <connect@thanethomson.com> * Add Subscription section on leveraging futures::stream utils Signed-off-by: Thane Thomson <connect@thanethomson.com> * Clarify ERD by using colours instead of line thickness Signed-off-by: Thane Thomson <connect@thanethomson.com> Co-authored-by: Alexander Simmerl <a.simmerl@gmail.com>
Merges the latest changes from `master` while adjusting for Tendermint v0.34 compatibility. Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
@thanethomson Just pushed a fix for the last remaining WASM build error. |
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Awesome, thanks @romac! 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like great work. Finally got a chance to at least read through it all but still some pieces I want to look more closely at.
While the ADR diagram is helpful for outlining the components, I think an even more helpful diagram might be one of the concurrency, showing what the different threads are and how the channel connectivity works and who's sending msgs to whom
light-client/Cargo.toml
Outdated
@@ -28,7 +28,7 @@ secp256k1 = ["tendermint/secp256k1", "tendermint-rpc/secp256k1"] | |||
|
|||
[dependencies] | |||
tendermint = { version = "0.16.0", path = "../tendermint" } | |||
tendermint-rpc = { version = "0.16.0", path = "../rpc", default-features = false } | |||
tendermint-rpc = { version = "0.16.0", path = "../rpc", features = ["client", "transport_http"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to preserve this default-features = false
and list the features above (line 26)?
Also what's the significant of the transport_http feature? When would someone not want it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the transport_http
feature, I'm not sure. Are there instances where someone may want to exclusively use WebSocket-based subscription, but not perform any other requests via the HTTP interface?
Also, when we implement a WebSocket-based RPC client for the rest of the RPC methods (i.e. those currently only supported via HTTP), would we want to include the HTTP client by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a question about dependencies. I would expect websockets to include a superset of the http dependencies, or at least the majority ++. If there's any extra deps in the http client that are not in the websocket client I'd expect them to be minimal and so the cost of just bundling them would be small.
So,
Are there instances where someone may want to exclusively use WebSocket-based subscription, but not perform any other requests via the HTTP interface?
Probably, but not sure it's worth the feature distinction to have websockets without the http.
That said we may want to preserve http without websockets.
So I would imagine three layers of use cases:
- default (no http or websockets, just rpc types)
- http (no websockets)
- full (websockets, http, everything)
Hopefully that would simplify things. In general for now I think we should air on the side of simplicity and consolidation rather than flexibility and choice, unless we have strong existing demand/reason for flexibility in certain places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the HttpClient
requires the use of the hyper
and http
crates, whereas the WebSocket-based subscription client doesn't.
If you'd like to simplify, can I suggest doing away with the subscription
feature and shortening the feature names:
Flag | Description | Additional Dependencies |
---|---|---|
default |
Just the RPC types | None |
client |
All the client-related traits, but no implementations | async-trait , futures , tokio |
http |
HttpClient (implements Client ) |
http , hyper , tokio |
websocket |
WebSocketSubscriptionClient , and in future a WebSocketClient , which will implement the Client and SubscriptionClient traits and will supersede the WebSocketSubscriptionClient |
async-tungstenite , tokio |
mock |
MockClient and MockSubscriptionClient |
tokio |
Perhaps it's worth it here to rename the existing WebSocketSubscriptionClient
to WebSocketClient
, so its name doesn't change in future?
Also, we could do away with the mock clients if you want. They were useful in my testing, but it's up to downstream users (e.g. IBC) to say if they'd see value in such a mock client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity, we've simplified this even further now, as per #569
let peer_addr = self.peer_map.get(&peer).unwrap().to_owned(); | ||
rpc::Client::new(peer_addr) | ||
Ok(rpc::HttpClient::new(peer_addr).map_err(IoError::from)?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe pedantic but shouldn't the PeerId type already hold a valid address and hence this shouldn't need to return an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically a Tendermint Address
could contain either a TCP or a Unix socket address, and we need the host/port details in order to connect to the RPC endpoint.
We could do away with the need for an error here by requiring host/port details instead of a peer_addr
, or we could just panic if a user supplies a Unix socket address?
client = [ "async-trait", "futures" ] | ||
secp256k1 = [ "tendermint/secp256k1" ] | ||
subscription = [ "tokio" ] | ||
transport_http = [ "http", "hyper", "tokio" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should just be consolidated with client? How much benefit do we really get by opting for the transport_mock over the transport_http ?
) -> Result<abci_query::AbciQuery, Error> { | ||
) -> Result<abci_query::AbciQuery> | ||
where | ||
V: Into<Vec<u8>> + Send, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we needed this to be impl Trait
in the first place but the added type bound suggests maybe we should reconsider? Could it not just be Vec from the get go?
Not necessarily to address in this PR but follow up along with the Height comment
/// ## Examples | ||
/// | ||
/// ```rust,ignore | ||
/// use tendermint_rpc::{WebSocketSubscriptionClient, SubscriptionClient, ClosableClient}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great if we only needed one import here but I guess there's no way to avoid needing to explicitly import the traits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love to have a way to do away with these extra imports, just not sure how. @romac do you perhaps know of some kind of trick to consolidate these imports? (I can only think of hairy macro-based ones)
return Ok(()); | ||
} | ||
self.router | ||
.pending_add(id.as_ref(), &id, query, event_tx, result_tx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the diff between id_as_ref()
and &id
and should these two things really be coming from the same id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was technically an implementation of the AsRef
trait to provide a &str
reference, but looking at it now it's probably somewhat ambiguous. I've now implemented an as_str()
method to return a &str
instead, which I think should be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's also a bit jarring that these two distinct Ids are actually the same. Is that intended? Maybe could be documented/clarified somewhere the subscription id will be drawn from the json-rpc request id ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I had them both be exactly the same, but then I ran into an issue where I needed to send unsubscribe requests with their own request ID (distinct from the subscription ID) to ensure that the response from the remote endpoint is, in fact, exclusively relevant to the unsubscribe request. Otherwise we may confuse an error meant for the subscription with one meant for the unsubscribe request.
So subscribe requests' IDs are the same as the subscription ID (to correlate incoming events, and possibly errors, relating to the subscription), but unsubscribe requests' IDs need to be distinct.
pub fn localhost_rpc_client() -> Client { | ||
Client::new("tcp://127.0.0.1:26657".parse().unwrap()) | ||
pub fn localhost_rpc_client() -> HttpClient { | ||
HttpClient::new("tcp://127.0.0.1:26657".parse().unwrap()).unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there's an unnecessary extra .unwrap() here from an API perspective, no? Related to my other PeerId comment
For some reason, I cannot reply inline in the PR, so here it is:
Yes, we might also want to have a |
…clarity Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more thoughts while I continue to dive into this.
If we bring back the old event listener, then I think it's fine to merge this (ie. it should be mostly non-breaking) and then we can continue to review and improve it from master
@@ -1,216 +0,0 @@ | |||
//! Tendermint Websocket event listener client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should bring this back for now so folks can upgrade their websocket code at their own pace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trouble with bringing this back is that the types exposed by the event listener conflict with the types I've added in rpc/src/event.rs
. As far as I can see in IBC it's just this code that needs to be updated: https://github.com/informalsystems/ibc-rs/blob/master/relayer/src/event_monitor.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok fair enough. Let's merge this as is and once ibc is ready to upgrade to our master we can work on integration there.
), | ||
), | ||
}, | ||
Some(cmd) = self.cmd_rx.recv() => match cmd { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a bit confusing that subscriptions come over the cmd_rx channel but the unsubscriptions come over a separate terminate channel. Why not give the Susbcription objects the cmd_tx and let them send Unsubscribe commands over that?
} | ||
|
||
async fn run(mut self) -> Result<()> { | ||
// TODO(thane): Should this loop initiate a keepalive (ping) to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so. At least the Go client does this and I think it's proper behaviour for a websocket client
loop { | ||
tokio::select! { | ||
Some(res) = self.stream.next() => match res { | ||
Ok(msg) => self.handle_incoming_msg(msg).await?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm trying to understand what benefit we're get from using async/await, if any. It seems that pretty much every possibly async function call has an await immediately after, so everything in this run
method is effectively happening synchronously. Is that true? It seems it may apply more broadly than just this run method, but since this runs in it's own thread and is relatively contained, it's easier to analyze.
I was also somewhat surprised to see the use of an explicit thread for this when we were using async/await for everything since I might have naively thought that the runtime would manage threads for us and we could specify through our use of .await where we wanted to block.
But it's possible I'm not thinking about this right, these are just initial thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. Let's merge this and open a follow up issue to track some fixes (eg adding pings) and any future work (refactor of APIs or internals)
Addressed or to be dealt with in follow ups
Great stuff @thanethomson! 💯 |
Closes #313
In the spirit of our session today, I'm building and testing this alongside developing ADR #476 at the moment (it diverges slightly from the ADR right now, but I will update the ADR tomorrow).
After several iterations, I think I've come up with a subscription management interface and a transport abstraction layer that could mostly work for our purposes.
Since this is a pretty large PR, the quickest way to get into it is to first read the updated crate docs for the
tendermint-rpc
package:cd ./rpc/ cargo doc --all-features --open