-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix potential NoDelta leak into IReplicatedData.Merge() #4811
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
using Akka.Actor; | ||
using Akka.DistributedData.Internal; | ||
using Akka.Event; | ||
using Akka.Util.Internal; | ||
|
||
namespace Akka.DistributedData | ||
{ | ||
|
@@ -68,13 +69,11 @@ public ImmutableDictionary<Address, DeltaPropagation> CollectPropagations() | |
if (all.Length <= sliceSize) slice = all; | ||
else | ||
{ | ||
var start = (int)(_deltaNodeRoundRobinCounter % all.Length); | ||
var buffer = new Address[sliceSize]; | ||
for (var i = 0; i < sliceSize; i++) | ||
{ | ||
buffer[i] = all[(start + i) % all.Length]; | ||
} | ||
slice = ImmutableArray.CreateRange(buffer); | ||
var i = (int)(_deltaNodeRoundRobinCounter % all.Length); | ||
slice = all.Slice(i, sliceSize).ToImmutableArray(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code looks different than the reference scala because in scala, slice parameters are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks - I was just about to ask |
||
|
||
if (slice.Length != sliceSize) | ||
slice = slice.AddRange(all.Take(sliceSize - slice.Length)); | ||
} | ||
|
||
_deltaNodeRoundRobinCounter += sliceSize; | ||
|
@@ -105,19 +104,17 @@ public ImmutableDictionary<Address, DeltaPropagation> CollectPropagations() | |
var cacheKey = (key, fromSeqNr, toSeqNr); | ||
if (!cache.TryGetValue(cacheKey, out var deltaGroup)) | ||
{ | ||
using (var e = deltaEntriesAfterJ.Values.GetEnumerator()) | ||
deltaGroup = deltaEntriesAfterJ.Values.Aggregate((d1, d2) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replaced the suspicious looking code block with one that makes a lot more sense. val group = deltaEntriesAfterJ.valuesIterator.reduceLeft { (d1, d2) =>
val merged = d2 match {
case NoDeltaPlaceholder => NoDeltaPlaceholder
case _ =>
// this is fine also if d1 is a NoDeltaPlaceholder
d1.merge(d2.asInstanceOf[d1.T])
}
merged match {
case s: ReplicatedDeltaSize if s.deltaSize >= maxDeltaSize =>
// discard too large deltas
NoDeltaPlaceholder
case _ => merged
}
}
|
||
{ | ||
e.MoveNext(); | ||
deltaGroup = e.Current; | ||
while (e.MoveNext()) | ||
{ | ||
deltaGroup = deltaGroup.Merge(e.Current); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bug happened here, a |
||
if (deltaGroup is IReplicatedDeltaSize s && s.DeltaSize > MaxDeltaSize) | ||
{ | ||
deltaGroup = DeltaPropagation.NoDeltaPlaceholder; | ||
} | ||
} | ||
} | ||
var merged = ReferenceEquals(d2, DeltaPropagation.NoDeltaPlaceholder) | ||
? DeltaPropagation.NoDeltaPlaceholder | ||
: d1.Merge(d2); | ||
|
||
if (merged is IReplicatedDeltaSize s && s.DeltaSize >= MaxDeltaSize) | ||
return DeltaPropagation.NoDeltaPlaceholder; // discard too large deltas | ||
|
||
return merged; | ||
}); | ||
|
||
cache[cacheKey] = deltaGroup; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of the code also looks suspicious, so I re-ported it with a cleaner one.
Reference scala code: