-
Notifications
You must be signed in to change notification settings - Fork 834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix delta metric storage concurrency bug #5932
Fix delta metric storage concurrency bug #5932
Conversation
// Grab aggregated points. | ||
List<T> points = new ArrayList<>(aggregatorHandles.size()); | ||
aggregatorHandles.forEach( | ||
(attributes, handle) -> { | ||
T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset); | ||
if (reset) { | ||
aggregatorHandles.remove(attributes, handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the bug:
- On line 153 we grab the aggregated data from the handle
- On line 155 we remove the handle from the map
- Removing the handle from the map prompts new measurements for the attributes to add a new handle to the map
- Aggregating the data from the handle and removing it from the map is not atomic. This results in the possibility of lost writes: where another recording thread records to the handle after it has been aggregated but before it is removed from the map.
The solution is to guard the aggregatorHandles
map with a read write lock. When collecting, we obtain a write lock and replace the aggregatorHandles
map with a new instance. When recording, we obtain a read lock, which is cheap and blocks the collect thread from replacing the aggregatorHandles
map until the record thread releases the read lock.
@@ -83,8 +85,13 @@ Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() { | |||
|
|||
@Override | |||
public void recordLong(long value, Attributes attributes, Context context) { | |||
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context); | |||
handle.recordLong(value, attributes, context); | |||
long stamp = sl.readLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an important note and a question.
Important note: We must never block upon recording, as we deteriorate performance (recording must wait for write lock to finish).
I suggest we create a data structure called AggregatorHandles:
AggregatorHandles {
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles
ReadWriteLock lock;
}
we should have an activeAggregatorHandles
and standbyAggregatorHandles
of that type.
When we record, we call readLock().tryLock
. If we get false
it means the write lock has been taken and we need to refresh the value from activeAggregatorHandles
(explained below why) hence upon false
we re-read the value at activeAggregatorHandles and call tryLock again - this should never fail - if it does, fail.
Upon collecting:
- First we switch between the
activeAggregatorHandles
andstandbyAggregatorHandles
- saving the value in active as the AggregatorHandles we will work on. - We make sure to define both active and standby variable as volatile, updating them will be immediately visible.
- We take the the write lock
writeLock().lock()
- This will cause us to block until all readers which took a handle will finish recording. Since it's not user-dependant, it should be immediate and guaranteed to happen. It's ok to block in collect() as it's not as latency sensitive asrecord()
. - Since we first made the switch and then obtained the write lock it means, right after the assignment to active variable, all record() will use that
AggregatorHandles
and obtain and use another lock (the newly active lock). The only "left-overs" we have are handles which retrieves the previousAggregatorHandles
, and haven't yet managed to callreadLock.lock()
. There are two options for them:
a. writeLock() was already called hence they will get "false" - this is a signal for them that theactiveAggregatorHandles
was switched - they need to re-read its value and obtain a read lock. We can wrap with a while loop. I tried thinking about it a lot I can't see it spinning forever - doesn't seem like a realistic option.
b.readLock.lock()
returns true and they continue to use it, and writeLock will wait for them.
Question.
I wonder why use StampedLock
vs using ReentrantReadWriteLock
which seems much easier to reason about, and doesn't require persisting a stamp in memory. It's harder to reason about the code when you see the stamp from lock without understanding why need the stamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm switching to ReentrantReadWriteLock
for reasons described here.
We must never block upon recording, as we deteriorate performance (recording must wait for write lock to finish).
Sure. The approach you outline is optimistic reads. There's a number of different ways to do this, and they appear to be simpler / higher performance with StampedLock. But in either case, I'll rework it so recording measurements doesn't have to block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're still blocking :) Once a collection start you grab a writeLock, which blocks all readLock until collection has finished. Hence I suggested a different approach outlined above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic you suggest does check out and does not block. I'm running the performance tests now to evaluate if reading a volatile variable every time we record on the hot path degrades performance in a serious way. It could be the case that reading a non-volatile variable and only blocking for an extremely short amount of time once per collection is better than reading a volatile variable every record but never blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so here's the JMH results for the plain read write lock solution which blocks for a narrow window when collecting vs. @asafm's proposal to always read from a volatile but never block. Source code for the volatile, never blocking solution here.
The solution to always read from a volatile and never block reduces performance on the record path by a modest ~4% versus the read write lock approach which blocks briefly during collection. Its also worth noting that as implemented, the volatile never block approach impacts the cumulative performance as well, which isn't strictly necessary since this concurrency bug only affects delta. Cumulative should be able to read a non-volatile variable safely since it never needs to change it.
Pushed a commit to use ReentrantReadWriteLock instead of StampedLock because StampedLock isn't supported in android API level 21. The performance is slightly worse. See benchmark for ReentrantReadWriteLock vs StampedLock, which tracks what I've read online. I opened #5936 because I'm beginning to be frustrated with the android requirements. If StampedLock was on the table, we might be able to get better performance with optimistic reads. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
... and 2 files with indirect coverage changes 📢 Thoughts on this report? Let us know! |
following the SIG discussion, I support the volatile approach |
Per conversation in the 11/9 Java SIG, I've pushed a commit with the non-blocking volatile approach outlined by @asafm. This ensures the record path is non-blocking, and that metrics are correct, but there may be ways to improve the implementation related to the problems described here. I will try to followup on this in a future PR to see if further optimization is possible. |
@asafm has identified a rather nefarious concurrency bug in the DefaultSynchronousMetricStorage for delta aggregation temporality.
This PR introduces a test which recreates almost always, and a fix utilizing a read-write lock. The idea is that while recording we take a relatively cheap read lock, and while collecting we take the more expensive write lock.
This results in a modest reduction in performance, but it seems to be the best way to solve the problem and correctness should trump performance. Here's the before and after of MetricsBenchmark, which focuses on heavily exercising the record path.