From 2064fbd34fb89b5bafc1ef89ed473848e5890c95 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 25 Aug 2020 22:21:44 +0700 Subject: [PATCH] Fix race condition in GroupedWithin. (#4555) --- src/core/Akka.Streams/Implementation/Fusing/Ops.cs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 7d16aac4d95..9bd3376dca1 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -3020,6 +3020,14 @@ public void OnPush() public void OnUpstreamFinish() { _finished = true; + + // Fix for issue #4514 + // Force check if we have a dangling last element because: + // OnTimer may close the group just before OnUpstreamFinish is called + // (race condition), dropping the last element in the stream. + if (IsAvailable(_stage._in)) + NextElement(Grab(_stage._in)); + if (_groupEmitted) CompleteStage(); else @@ -3052,8 +3060,11 @@ private void NextElement(T element) ScheduleRepeatedly(GroupedWithinTimer, _stage._timeout); CloseGroup(); } - else + // Do not pull if we're finished. + else if (!_finished) + { Pull(_stage._in); + } } private void CloseGroup()