Skip to content

Commit

Permalink
Fix race condition in GroupedWithin. (#4555)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Aug 25, 2020
1 parent a4c0fd9 commit 2064fbd
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,14 @@ public void OnPush()
public void OnUpstreamFinish()
{
_finished = true;

// Fix for issue #4514
// Force check if we have a dangling last element because:
// OnTimer may close the group just before OnUpstreamFinish is called
// (race condition), dropping the last element in the stream.
if (IsAvailable(_stage._in))
NextElement(Grab(_stage._in));

if (_groupEmitted)
CompleteStage();
else
Expand Down Expand Up @@ -3052,8 +3060,11 @@ private void NextElement(T element)
ScheduleRepeatedly(GroupedWithinTimer, _stage._timeout);
CloseGroup();
}
else
// Do not pull if we're finished.
else if (!_finished)
{
Pull(_stage._in);
}
}

private void CloseGroup()
Expand Down

0 comments on commit 2064fbd

Please sign in to comment.