-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Runtime diagnostics for leaked messages in unbounded channels #12971
Runtime diagnostics for leaked messages in unbounded channels #12971
Conversation
Converted the PR into draft to address all unbounded channels of substrate in one PR. |
…l-clogging-diagnostics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Did you test if the error is reported when the threshold is crossed?
Yes, I hardcoded the threshold to |
{ | ||
// `warning_fired` and `queue_size` are not synchronized, so it's possible | ||
// that the warning is fired few times before the `warning_fired` is seen | ||
// by all threads. This seems better than introducing a mutex guarding them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it better? Would proper synchronization (e.g. mutex) be a serious performance penalty here? Or is there any other reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather prefer an extra warning then a lock contention, though I don't know how many threads there are usually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error should never be emitted under normal circumstances, and the probability of emitting more than one error is also quite small imo, so I would avoid locking/holding a mutex every time message/event is sent over the channel. First of all, channels are meant to avoid synchronization, not to introduce it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) { | ||
let (tx, rx) = tracing_unbounded("mpsc_buffered_link"); | ||
pub fn buffered_link<B: BlockT>( | ||
queue_size_warning: i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need signed integer here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's explained in the comment for a struct field: to avoid underflow if, due to the lack of ordering, the counter happens to go < 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internally: yes. But public API doesn't need to be signed integer. This should have been u32
instead that should still be plenty big for all intents and purposes. Same for tracing_unbounded
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering how much of a performance difference it actually makes using Relaxed ordering here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that relaxed ordering makes sense in terms of performance, it's about not having to bother about synchronization of increments/decrements, why signed integer is used. Relaxed ordering is just a consequence of this decision, because more strong guarantees are not needed if we use the unsigned integer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've looked into the issue, and as far as I understand it's impossible to guarantee that the counter is never decremented before it's incremented not relying on internals of mpsc::unbounded()
. Basically, we have the following events:
Thread A | Thread B |
---|---|
increment |
pull |
push |
decrement |
In order for decrement
to never happen before increment
, push
in thread A must synchronize with pull
in thread B. Note that this is not a synchronization between operations with our atomic counter, but a synchronization of mpsc::unbounded()
operations we are not in control of. We can try setting the strongest sequentially consistent ordering guarantee for increment
and decrement
, but for this to work push
and pull
must also be sequentially consistent operations, what is unlikely and cannot be relied on.
Please correct me if I'm missing something.
CC @bkchr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use Acquire
/Release
it should work: https://en.cppreference.com/w/cpp/atomic/memory_order
The compiler should add some barrier that ensures that reads/writes are not reordered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW @nazar-pc why aren't you just use a channel with a size of 0 and using try_send?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I didn't see try_send
in there, also it is usually for different access patterns. I'd expect it to still produce a warning regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a PR implementing exact queue size warning (#13117), but I'd like it to be reviewed by somebody with good understanding of concurrency, atomics, and memory order of operations. If you know who to invite for review, please invite them.
pub fn tracing_unbounded<T>( | ||
key: &'static str, | ||
name: &'static str, | ||
queue_size_warning: i64, | ||
) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) { | ||
let (s, r) = mpsc::unbounded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you (or anyone else) know the reason for using unbounded
channels here? afaik they are never a good idea in production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a long going story about implementing back-pressure mechanisms in substrate (using bounded channels), but nobody knows how to implement it correctly so far. So at least we added the warning to detect if some of the channels are not being polled and leak messages (and memory).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but nobody knows how to implement it correctly so far
This is clearly not the problem :P The code base has grown and organically. Not all was build from the beginning with async being considered. Unbounded channels give you the opportunity to combine async/sync code in some easy way, but yeah there is no back pressure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Thanks 🙏
pub fn tracing_unbounded<T>( | ||
key: &'static str, | ||
name: &'static str, | ||
queue_size_warning: i64, | ||
) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) { | ||
let (s, r) = mpsc::unbounded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but nobody knows how to implement it correctly so far
This is clearly not the problem :P The code base has grown and organically. Not all was build from the beginning with async being considered. Unbounded channels give you the opportunity to combine async/sync code in some easy way, but yeah there is no back pressure.
queue_size: queue_size.clone(), | ||
queue_size_warning, | ||
warning_fired: Arc::new(AtomicBool::new(false)), | ||
creation_backtrace: Arc::new(Backtrace::capture()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to resolve and we should not require RUST_BACKTRACE
to be set. If we then want to print the backtrace, we should call resolve()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in #13020.
|
||
let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed); | ||
if queue_size == self.queue_size_warning && | ||
!self.warning_fired.load(Ordering::Relaxed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you not using https://doc.rust-lang.org/std/sync/atomic/struct.AtomicBool.html#method.compare_exchange ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also addressed in #13020.
Resolves #12853.
Report error if the unbounded channels
out_events::channel()
andmpsc::tracing_unbounded()
grow above the predifined configurable threshold. The channel name and the backtrace of where it was created (ifRUST_BACKTRACE=1
is set) is reported.