-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Watch transactions pool #10558
Watch transactions pool #10558
Conversation
It looks like @IntegralTeam hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement at the following URL: https://cla.parity.io Once you've signed, please reply to this thread with Many thanks, Parity Technologies CLA Bot |
[clabot:check] |
It looks like @IntegralTeam signed our Contributor License Agreement. 👍 Many thanks, Parity Technologies CLA Bot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good for a WiP, few things to polish.
ethcore/src/miner/miner.rs
Outdated
@@ -262,6 +263,13 @@ impl Miner { | |||
self.transaction_queue.add_listener(f); | |||
} | |||
|
|||
/// Set a callback to be notified | |||
pub fn get_tx_pool_receiver(&self) -> mpsc::Receiver<(H256, String)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yo can ditch get_
imho.
miner/src/pool/listener.rs
Outdated
/// Transactions pool notifier | ||
#[derive(Default)] | ||
pub struct TransactionsPoolNotifier { | ||
listeners: Vec<Arc<Mutex<mpsc::Sender<(H256, String)>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need Arc<Mutex
here afaict.
listeners: Vec<Arc<Mutex<mpsc::Sender<(H256, String)>>>>, | |
listeners: Vec<mpsc::Sender<(H256, String)>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomusdrw If I delete the Arc<Mutex<
, it leads to errors
Compiling ethcore v1.12.0 (/home/user/Work/parity-ethereum/ethcore)
error[E0277]: `std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>` cannot be shared between threads safely
--> ethcore/src/client/client.rs:1305:6
|
1305 | impl snapshot::DatabaseRestore for Client {
| ^^^^^^^^^^^^^^^^^^^^^^^^^ `std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>` cannot be shared between threads safely
|
= help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>>`
= note: required because it appears within the type `alloc::raw_vec::RawVec<std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>>`
= note: required because it appears within the type `std::vec::Vec<std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>>`
= note: required because it appears within the type `ethcore_miner::pool::listener::TransactionsPoolNotifier`
= note: required because it appears within the type `(ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)`
= note: required because it appears within the type `(ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier))`
= note: required because it appears within the type `(ethcore_miner::pool::local_transactions::LocalTransactionsList, (ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)))`
= note: required because it appears within the type `transaction_pool::pool::Pool<ethcore_miner::pool::VerifiedTransaction, ethcore_miner::pool::scoring::NonceAndGasPrice, (ethcore_miner::pool::local_transactions::LocalTransactionsList, (ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)))>`
= note: required because of the requirements on the impl of `std::marker::Sync` for `lock_api::rwlock::RwLock<parking_lot::RawRwLock, transaction_pool::pool::Pool<ethcore_miner::pool::VerifiedTransaction, ethcore_miner::pool::scoring::NonceAndGasPrice, (ethcore_miner::pool::local_transactions::LocalTransactionsList, (ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)))>>`
= note: required because it appears within the type `ethcore_miner::pool::TransactionQueue`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<ethcore_miner::pool::TransactionQueue>`
= note: required because it appears within the type `miner::miner::Miner`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::Arc<miner::miner::Miner>`
= note: required because it appears within the type `client::client::Importer`
= note: required because it appears within the type `client::client::Client`
...
Found this: Allowing Access from Multiple Threads with Sync
And it seems to me that either Mutex is needed here, or I don't understand what am I doing wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Yes, unfortunatelly we can't satisfy Sync
with std::mpsc::Sender
. To overcome this I'd go with mpsc::unbounded_channel
from futures
crate, this will also allow us to get rid of this extra thread responsible for polling the channel.
miner/src/pool/listener.rs
Outdated
impl TransactionsPoolNotifier { | ||
/// Add new listener to receive notifications. | ||
pub fn add(&mut self, f: mpsc::Sender<(H256, String)>) { | ||
self.listeners.push(Arc::new(Mutex::new(f))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.listeners.push(Arc::new(Mutex::new(f))); | |
self.listeners.push(f); |
miner/src/pool/listener.rs
Outdated
/// Notify listeners about all currently transactions. | ||
fn notify(& mut self, hash: H256, status: String) { | ||
for l in &self.listeners { | ||
let l = l.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would also clear dropped listeners:
let l = l.lock(); | |
self.listeners | |
.retain(|sender| sender.send((hash, status.clone()).is_ok()) |
miner/src/pool/listener.rs
Outdated
impl txpool::Listener<Transaction> for TransactionsPoolNotifier { | ||
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) { | ||
let hash = tx.hash.clone(); | ||
self.notify(hash, "added".to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to have an enum like this:
#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
enum Status {
Added,
Rejected,
...
Culled,
}
And the same status enum should be sent in the notification.
} | ||
pub fn run(&self, pool_receiver: mpsc::Receiver<(H256, String)>) { | ||
let handler = self.handler.clone(); | ||
thread::spawn(move || loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use future-aware channels instead to avoid spawning a new thread?
|
||
} | ||
|
||
pub struct TransactionsNotificationHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs
|
||
pub fn notify_transaction(&self, status: (H256, String)) { | ||
for subscriber in self.transactions_pool_subscribers.read().values() { | ||
let status = (status.0.clone(), status.1.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let status = (status.0.clone(), status.1.clone()); | |
let status = status.clone(); |
or even inline it into the next line.
|
||
/// Subscribe to Transactions Pool subscription. | ||
#[pubsub(subscription = "parity_watchTransactionsPool", subscribe, name = "parity_watchTransactionsPool")] | ||
fn subscribe(&self, Self::Metadata, typed::Subscriber<pubsub::Result>, Option<pubsub::Params>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't want any params just leave this out:
fn subscribe(&self, Self::Metadata, typed::Subscriber<pubsub::Result>, Option<pubsub::Params>); | |
fn subscribe(&self, Self::Metadata, typed::Subscriber<pubsub::Result>); |
impl TransactionsPool for TransactionsPoolClient { | ||
type Metadata = Metadata; | ||
|
||
fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<pubsub::Result>, params: Option<pubsub::Params>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using pubsub::Result
here kind of suggests that we might get different values of this enum as well. I think it should be safe to just use the tuple directly:
fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<pubsub::Result>, params: Option<pubsub::Params>) { | |
fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<(H256, Status)>, params: Option<pubsub::Params>) { |
miner/src/pool/listener.rs
Outdated
self.notify(hash, TxStatus::Added); | ||
} | ||
|
||
fn rejected(&mut self, tx: &Arc<Transaction>, _reason: &txpool::ErrorKind) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realised that this might be called really frequently, and currently we need to iterate over all senders and have a unique lock for this, so this will have drastic performance consequences.
Instead I think we should do something very similar that we do with Notifer
in the same file, i.e:
- We first aggregate all the notifications that should be sent
- Then we send them all at once after batch import to the queue is finished.
It might be even worth to remove Notifier
and convert this other notification to use this (channel-based) notifier instead.
struct Notifier {
pending: HashMap<H256, TxStatus>,
listeners: Vec<futures::sync::mpsc::UnboundedSender<Arc<HashMap<H256, TxStatus>>>>,
}
impl Notifier {
fn notify(&mut self) {
let to_send = Arc::new(std::mem::replace(&mut self.pending, HashMap::new));
self.listeners.retain(|listener| listener.unbounded_send(to_send.clone()).is_ok());
}
}
impl txpool::Listener<Transaction> for TransactionPoolNotifier {
fn added(&mut self, ...) {
self.pending.insert(tx.hash, TxStatus::Added);
}
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IntegralTeam Please let me know if you need any more explanations or help with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomusdrw Thank you, I've got what I need to do.
But, I think, it's not quite right to use HashMap
here, because transaction firstly could be added with according Added
status, and then before sending notification, same transaction could get status Culled
and it will lead that Added
status will be erased from HashMap
Maybe it will be better to use tuple vector Vec<(H256, TxStatus)>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IntegralTeam good point, yeah if we never want to miss any event and have them in order, then Vec
is indeed a good choice here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's do it this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice now, I'd like to see the other Notifier
rewritten to futures as well, but we can leave that as a separate PR - please create an issue for this if that happens to be the case.
@@ -303,6 +303,8 @@ impl TransactionQueue { | |||
// Notify about imported transactions. | |||
(self.pool.write().listener_mut().1).0.notify(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This listener now seems obsolete, since it just duplicates the same data and is less efficient than the new channel-based one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice now, I'd like to see the other
Notifier
rewritten to futures as well, but we can leave that as a separate PR - please create an issue for this if that happens to be the case.
Ok, then let`s do it in other PR
This listener now seems obsolete, since it just duplicates the same data and is less efficient than the new channel-based one.
Maybe we can make one Notifier by combining old one Notifier
and TransactionsPoolNotifier
and store two different lists of listeners for parity_watchTransactionsPool
and eth_pubsub
in it?
# Conflicts: # ethcore/src/miner/miner.rs # miner/src/pool/queue.rs # rpc/src/v1/types/pubsub.rs
@tomusdrw Please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last tiny grumble regarding recently added commit, but looks good otherwise.
Could you also create an issue to merge the two listeners?
|
||
let to_send: Vec<(H256, TxStatus)> = hashes.into_iter().map(|hash| (hash.clone(), status)).collect(); | ||
|
||
self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(Arc::new(to_send.clone())).is_ok()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's incorrect, the reason we have Arc
is to avoid cloning the entire Vec
. So:
- First create the
Arc
- Then clone
Arc
notVec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomusdrw I fixed it
18b0800
to
286ba53
Compare
Perfect 👌 |
Issue #9713
Still in progress, but would like some review on current approach.
What is not ready:
I will add more if feedback will be positive