Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Streams: weird behavior with RepeatPrevious #7217

Closed
Aaronontheweb opened this issue May 31, 2024 · 12 comments · Fixed by #7219
Closed

Akka.Streams: weird behavior with RepeatPrevious #7217

Aaronontheweb opened this issue May 31, 2024 · 12 comments · Fixed by #7219

Comments

@Aaronontheweb
Copy link
Member

Version Information
Version of Akka.NET? v1.5.20
Which Akka.NET Modules? Akka.Streams

Describe the bug

Consider the following code:

var actorSystem = ActorSystem.Create("TestSys");

var source1 = Source.From(new[] { 1, 2, 3, 4 });
var source2 = Source.From(new[] { "Celsius", "Fahrenheit" });
var source3 = Source.From(new[] { 10.0 });

var source4 = source1.Zip(source2.RepeatPrevious())
   .ZipWith(source3.RepeatPrevious(), (a, b) => (a.Item1, a.Item2, b));
   
await source4.RunForeach(c => Console.WriteLine(c), actorSystem);

We get the following output:

(1, Celsius, 10)
(2, Celsius, 10)

Expected behavior

I would expect we would get at least 4 elements - 1 for each of the items in source1. Why don't we?

Actual behavior

We only get 2 elements - and we never have a single "Fahrenheit" element appear.

Additional context

This could be a misconfiguration with how the various sources are being zipped together, but it doesn't read that way to me - this seems legitimate and like it should produce 4 total elements.

@Aaronontheweb
Copy link
Member Author

Accidentally tagged the wrong issue on #7219

@Arkatufus
Copy link
Contributor

Arkatufus commented May 31, 2024

It is the .RepeatPrevious() that makes this confusing.
If we modify the repro code to this:

var source1 = Source.From(new[] { 1, 2, 3, 4 });
var source2 = Source.From(new[] { "C", "F" });
var source3 = source1
    .Zip(source2.RepeatPrevious());

await source3.RunForeach(c => Console.WriteLine(c.ToString()), Materializer);

What happens internally inside the ReuseLatest stage (the logic behind RepeatPrevious is:

  1. OnPull(), no last value, calling Pull(Stage.In)
  2. OnPush():
    1. calling Grab(Stage.In)
    2. set last value = "C"
    3. calling Push(Stage.Out, "C")
  3. OnPull():
    1. last value is "C" and Stage.In has not been pulled: Calling Pull(Stage.In)
    2. calling Push(Stage.Out, "C")
  4. OnPush():
    1. calling Grab(Stage.In)
    2. Set last value = "F"
    3. Stage.Out is not available, skipping Push
  5. OnPull():
    1. previous value is "F" and Stage.In has not been pulled: Calling Pull(Stage.In)
    2. calling Push(Stage.Out, "F")
  6. Source exhaused, stage completed.

As you can see, there are 3 pushes and then the source completes the stream. The element "C" was repeated twice.

@Arkatufus
Copy link
Contributor

Lets say that in the original reproduction case, we'll make source2 and source3 to never complete the stage by adding this code:

var source1 = Source.From(new[] { 1, 2, 3, 4, 5, 6 });
var source2 = Source.From(() => RepeatLastElement(new[] { "A", "B", "C", "D" }));
var source3 = Source.From(() => RepeatLastElement(new[] { 10.0, 20.0 }));
var source4 = source1
    .Zip(source2.RepeatPrevious())
    .ZipWith(source3.RepeatPrevious(), (a, b) => (a.Item1, a.Item2, b));

await source4.RunForeach(c => Output.WriteLine(c.ToString()), Materializer);
return;

async IAsyncEnumerable<T> RepeatLastElement<T>(T[] array)
{
    if (array.Length == 0)
        throw new Exception("array must contain at least 1 element");
    var lastIdx = array.Length - 1;
    var idx = 0;
    while (true)
    {
        yield return array[idx];
        idx++;
        if (idx > lastIdx)
            idx = lastIdx;
    }
}

The output became what we (kind of) expected:

(1, A, 10)
(2, A, 10)
(3, B, 20)
(4, C, 20)
(5, D, 20)
(6, D, 20)

The first element of any source tagged with .RepeatPrevious() will always get duplicated, should this be fixed?

@Arkatufus
Copy link
Contributor

I guess not, because the spec itself specify that there will always be a side-effect of repeated element. We can mitigate this by introducing a default value initializer to .RepeatPrevious()

@Aaronontheweb
Copy link
Member Author

@Arkatufus semantically, I think both code samples should behave identically - that we're never moving past the initial elements onto the older ones is a bug. RepeatPrevious should be pulling new values and it's not.

@Arkatufus
Copy link
Contributor

No, that is impossible because source3 is a single source and it will complete the stage after emitting the first element

@Aaronontheweb
Copy link
Member Author

We can mitigate this by introducing a default value initializer to .RepeatPrevious()

This is also the wrong solution for fixing this issue - upstreams aren't being pulled, by the looks of it. That the stream terminates without exhausting the values of any of the original Sources is definitely not correct either.

@Aaronontheweb
Copy link
Member Author

No, that is impossible because source3 is a single source and it will complete the stage after emitting the first element

I guess that's a fair point, but then why do we get 2 elements instead of 1?

@Aaronontheweb
Copy link
Member Author

I guess we'd need to wrap the source inside a "Repeat" behavior, similar to Recover, in order to stop that source from terminating.

@Arkatufus
Copy link
Contributor

I guess that's a fair point, but then why do we get 2 elements instead of 1?

https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Streams.Tests/Dsl/ReuseLatestSpec.cs#L30-L46

The 2 repeating elements are part of the spec

@Arkatufus
Copy link
Contributor

"When there were no buffered last value, we will pull", that causes the double first element. A default value will mitigate that, at least it gives the user some agency in detecting that nothing was pulled yet

@Aaronontheweb
Copy link
Member Author

I guess that's a fair point, but then why do we get 2 elements instead of 1?

https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Streams.Tests/Dsl/ReuseLatestSpec.cs#L30-L46

The 2 repeating elements are part of the spec

Ugh, yeah I remember reasoning about this in the original implementation now - it takes an extra pull to kill it when the upstream is dead. And the reason why the graph terminates is because we don't want to keep a dead Source alive forever. Ok, you're 100% right - this is working as intended even though it looks odd from the outside.

@Aaronontheweb Aaronontheweb removed this from the 1.5.22 milestone Jun 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants