Skip to content

Commit

Permalink
Akka.Streams: ReuseLatest stage to repeatedly emit the most recent …
Browse files Browse the repository at this point in the history
…value until a newer one is pushed (#6262)

* code cleanup in Akka.Streams `Attributes`

* added `RepeatPrevious{T}` stage

* WIP - debugging `RepeatPreviousSpecs`

* fixed tests and added documentation

* fixed documentation

* API approvals

* fixed markdown linting

* removed `SwapPrevious<T>` delegate.

* renamed stage from `RepeatPrevious` to `ReuseLatest`

* remove BDN results
  • Loading branch information
Aaronontheweb authored Nov 28, 2022
1 parent d914eb3 commit 76c9364
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 8 deletions.
29 changes: 29 additions & 0 deletions docs/articles/streams/buffersandworkingwithrate.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,32 @@ var driftFlow = Flow.Create<int>()
```

Note that all of the elements coming from upstream will go through expand at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.

### Reusing Values Downstream

When working with fan-in stages such as `Zip` where one `Source<T>` might produce messages infrequently, it can be helpful to cache the previous value and re-use it in combination with a faster stream.

For instance, consider the following scenario:

* A `Source<HttpClient, NotUsed>` that emits an updated `HttpClient` with new bearer-token credentials every 30 minutes and
* A `Source<HttpRequestMessage, NotUsed>` that emits outbound `HttpRequestMessage`s as they come - at any given moment it can produce zero requests per second or thousands of requests per second.

In this scenario we're going to want to combine the `Source<HttpClient, NotUsed>` and `Source<HttpRequestMessage, NotUsed>` together so the `HttpClient` can execute all of the `HttpRequestMessage`s - however, given that `HttpClient`s are only emitted once every 30 minutes - how can we use a stage like `Zip` to make sure that every `HttpRequestMessage` gets serviced in a timely, low-latency fashion?

Enter the `ReuseLatest` stage - which will allow us to reuse the most recent `HttpClient` each time a new `HttpRequestMessage` arrives:

```csharp
public static Source<HttpClient, ICancelable> CreateSourceInternal(string clientId,
Func<Task<string>> tokenProvider, TimeSpan tokenRefreshTimeout)
{
var source = Source.Tick(TimeSpan.Zero, TimeSpan.FromSeconds(30), clientId)
.SelectAsync(1, async c =>
// refresh bearer token, create new HttpClient
CreateClient(c, (await tokenProvider().WaitAsync(tokenRefreshTimeout))))
// reuse the previous value whenever there's downstream demand
.ReuseLatest();
return source;
}
```

This type of design allows us to decouple the rate at which `HttpClient`s are produced from the rate at which `HttpRequestMessage`s are.
18 changes: 18 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,24 @@ Skip elements as long as a predicate function return true for the element

**completes** when upstream completes

### ReuseLatest

Re-use the most recently emitted element downstream.

> [!NOTE]
> `ReuseLatest` is typically used in combination with fan-in stages such as `Zip` - please see "[Reusing Values Downstream](xref:streams-buffers#reusing-values-downstream)"
**emits** as long as one element has been emitted from upstream, that element will be emitted downstream
whenever the `ReuseLatest` stage is pulled. If a new value is emitted from upstream, that value will be pushed and will replace the previous value.

**backpressures** when downstream backpressures.

**completes** when upstream completes

`ReuseLatest` Sample:

[!code-csharp[ReuseLatest](../../../src/core/Akka.Streams.Tests/Dsl/ReuseLatestSpec.cs?name=RepeatPrevious)]

### Recover

Allow sending of one last element downstream when a failure has happened upstream.
Expand Down
3 changes: 1 addition & 2 deletions docs/articles/streams/pipeliningandparallelism.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,4 @@ compared to the parallel pipelines. This pattern re-balances after each step, wh
at the entry point of the pipeline. This only matters however if the processing time distribution has a large
deviation.

[^foot-note-1]: Bartosz's reason for this seemingly suboptimal procedure is that he prefers the temperature of the second pan
to be slightly lower than the first in order to achieve a more homogeneous result.
[^foot-note-1]: Bartosz's reason for this seemingly suboptimal procedure is that he prefers the temperature of the second pan to be slightly lower than the first in order to achieve a more homogeneous result.
12 changes: 12 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,8 @@ namespace Akka.Streams.Dsl
[System.ObsoleteAttribute("Use RecoverWithRetries instead. [1.1.2]")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RecoverWith<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RecoverWithRetries<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc, int attempts) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Action<TOut, TOut> onItemChanged) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Scan<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> scan) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> ScanAsync<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, System.Threading.Tasks.Task<TOut2>> scan) { }
public static Akka.Streams.Dsl.Flow<T, TOut, TMat> Select<T, TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<T, TIn, TMat> flow, System.Func<TIn, TOut> mapper) { }
Expand Down Expand Up @@ -1845,6 +1847,14 @@ namespace Akka.Streams.Dsl
[Akka.Annotations.ApiMayChangeAttribute()]
public static Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TIn, TState>, System.ValueTuple<Akka.Util.Result<TOut>, TState>>, TMat> Create<TIn, TState, TOut, TMat>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TIn, TState>, System.ValueTuple<Akka.Util.Result<TOut>, TState>>, TMat> flow, System.Func<TState, Akka.Util.Option<System.ValueTuple<TIn, TState>>> retryWith) { }
}
public sealed class ReuseLatest<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, T>>
{
public ReuseLatest() { }
public ReuseLatest(System.Action<T, T> onItemChanged) { }
public override Akka.Streams.FlowShape<T, T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
public class static ReverseOps
{
public static Akka.Streams.Dsl.GraphDsl.Builder<TMat> From<TIn, TOut, TMat>(this Akka.Streams.Dsl.GraphDsl.ReverseOps<TIn, TMat> ops, Akka.Streams.Outlet<TOut> outlet)
Expand Down Expand Up @@ -2059,6 +2069,8 @@ namespace Akka.Streams.Dsl
[System.ObsoleteAttribute("Use RecoverWithRetries instead. [1.1.2]")]
public static Akka.Streams.Dsl.Source<TOut, TMat> RecoverWith<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> RecoverWithRetries<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc, int attempts) { }
public static Akka.Streams.Dsl.Source<T, TMat> RepeatPrevious<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> source) { }
public static Akka.Streams.Dsl.Source<T, TMat> RepeatPrevious<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> source, System.Action<T, T> onItemUpdated) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> Scan<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> scan) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> ScanAsync<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, System.Threading.Tasks.Task<TOut2>> scan) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> Select<TIn, TOut, TMat>(this Akka.Streams.Dsl.Source<TIn, TMat> flow, System.Func<TIn, TOut> mapper) { }
Expand Down
84 changes: 84 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/ReuseLatestSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//-----------------------------------------------------------------------
// <copyright file="RepeatPreviousSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Streams.Dsl;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
public class ReuseLatestSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public ReuseLatestSpec(ITestOutputHelper testOutputHelper) : base(Config.Empty, output: testOutputHelper)
{
var settings = ActorMaterializerSettings.Create(Sys);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact]
public async Task RepeatPrevious_should_immediately_terminate_with_Empty_source()
{
var source = Source.Empty<int>();
var result = await source.RepeatPrevious().RunWith(Sink.Seq<int>(), Materializer);
result.Should().BeEmpty();
}

[Fact]
public async Task RepeatPrevious_should_complete_when_upstream_completes()
{
var source = Source.Single(1).RepeatPrevious();
var result = await source.RunWith(Sink.Seq<int>(), Materializer);

// as a side-effect of RepeatPrevious' buffering process, there's going to be an extra element in the result
result.Should().BeEquivalentTo(1, 1);
}

[Fact]
public async Task RepeatPrevious_should_fail_when_upstream_fails()
{
Func<Task> Exec() => async () =>
{
var source = Source.From(Enumerable.Range(0,9)).Where(i =>
{
if (i % 5 == 0)
{
throw new ApplicationException("failed");
}

return true;
}).RepeatPrevious();
var result = await source.RunWith(Sink.Seq<int>(), Materializer);
};

await Exec().Should().ThrowAsync<ApplicationException>();
}

[Fact]
public async Task RepeatPrevious_should_repeat_when_no_newValues_available()
{
// <RepeatPrevious>
var (queue, source) = Source.Queue<int>(10, OverflowStrategy.Backpressure).PreMaterialize(Materializer);

// populate 1 into queue
await queue.OfferAsync(1);

// take 4 items from the queue
var result = await source.RepeatPrevious().Take(4).RunWith(Sink.Seq<int>(), Materializer);

// the most recent queue item will be repeated 3 times, plus the original element
result.Should().BeEquivalentTo(1,1,1,1);
// </RepeatPrevious>
}
}
}
8 changes: 3 additions & 5 deletions src/core/Akka.Streams/Attributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public Attributes(params IAttribute[] attributes)
/// INTERNAL API
/// </summary>
internal bool IsAsync
=> _attributes.Count() > 0 &&
=> _attributes.Length > 0 &&
_attributes.Any(
attr => attr is AsyncBoundary ||
attr is ActorAttributes.Dispatcher);
Expand Down Expand Up @@ -489,9 +489,7 @@ public static Attributes CreateLogLevels(LogLevel onElement = LogLevel.DebugLeve
/// <returns>TBD</returns>
public static string ExtractName(IModule module, string defaultIfNotFound)
{
var copy = module as CopiedModule;

return copy != null
return module is CopiedModule copy
? copy.Attributes.And(copy.CopyOf.Attributes).GetNameOrDefault(defaultIfNotFound)
: module.Attributes.GetNameOrDefault(defaultIfNotFound);
}
Expand Down Expand Up @@ -531,7 +529,7 @@ public bool Equals(Dispatcher other)
return true;
return Equals(Name, other.Name);
}
public override bool Equals(object obj) => obj is Dispatcher && Equals((Dispatcher)obj);
public override bool Equals(object obj) => obj is Dispatcher dispatcher && Equals(dispatcher);
public override int GetHashCode() => Name?.GetHashCode() ?? 0;
public override string ToString() => $"Dispatcher({Name})";
}
Expand Down
39 changes: 38 additions & 1 deletion src/core/Akka.Streams/Dsl/FlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace Akka.Streams.Dsl
{
/// <summary>
/// TBD
/// The set of DSL methods for composing <see cref="Flow{TIn,TOut,TMat}"/> stages together.
/// </summary>
public static class FlowOperations
{
Expand Down Expand Up @@ -2438,5 +2438,42 @@ public static FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat> AsFlowWithContex

return FlowWithContext.From(flowWithTuples);
}

/// <summary>
/// Repeats the previous element from upstream until it's replaced by a new value.
/// </summary>
/// <param name="flow">The previous Flow stage in this stream.</param>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <typeparam name="TMat">The materialization type.</typeparam>
/// <remarks>
/// This is designed to allow fan-in stages where output from one of the sources is intermittent / infrequent
/// and users just want the previous value to be reused.
/// </remarks>
public static Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow)
{
return flow.Via(new ReuseLatest<TOut>());
}

/// <summary>
/// Repeats the previous element from upstream until it's replaced by a new value.
/// </summary>
/// <param name="flow">The previous Flow stage in this stream.</param>
/// <param name="onItemChanged">A <see cref="Action{TOut, TOut}"/> function that allows the stage to perform clean-up operations when the previously repeated
/// value is being replaced.
///
/// This is used for things like calling <see cref="IDisposable.Dispose"/> on the previous value.
/// </param>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <typeparam name="TMat">The materialization type.</typeparam>
/// <remarks>
/// This is designed to allow fan-in stages where output from one of the sources is intermittent / infrequent
/// and users just want the previous value to be reused.
/// </remarks>
public static Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, Action<TOut, TOut> onItemChanged)
{
return flow.Via(new ReuseLatest<TOut>(onItemChanged));
}
}
}
96 changes: 96 additions & 0 deletions src/core/Akka.Streams/Dsl/ReuseLatest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//-----------------------------------------------------------------------
// <copyright file="RepeatPrevious.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Streams.Stage;
using Akka.Util;

namespace Akka.Streams.Dsl
{
/// <summary>
/// Reuses the latest element from upstream until it's replaced by a new value.
///
/// This is designed to allow fan-in stages where output from one of the sources is intermittent / infrequent
/// and users just want the previous value to be reused.
/// </summary>
/// <typeparam name="T">The output type.</typeparam>
public sealed class ReuseLatest<T> : GraphStage<FlowShape<T, T>>
{
private readonly Inlet<T> _in = new Inlet<T>("RepeatPrevious.in");
private readonly Outlet<T> _out = new Outlet<T>("RepeatPrevious.out");

public override FlowShape<T, T> Shape => new FlowShape<T, T>(_in, _out);
private readonly Action<T,T> _onItemChanged;

/// <summary>
/// Do nothing by default
/// </summary>
private static readonly Action<T,T> DefaultSwap = (oldValue, newValue) => { };

public ReuseLatest() : this(DefaultSwap)
{
}

public ReuseLatest(Action<T, T> onItemChanged)
{
_onItemChanged = onItemChanged;
}

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) =>
new Logic(this, _onItemChanged);

private sealed class Logic : InAndOutGraphStageLogic
{
private readonly ReuseLatest<T> _stage;
private Option<T> _last;
private readonly Action<T,T> _onItemChanged;

public Logic(ReuseLatest<T> stage, Action<T,T> onItemChanged) : base(stage.Shape)
{
_stage = stage;
_onItemChanged = onItemChanged;

SetHandler(_stage._in, this);
SetHandler(_stage._out, this);
}

public override void OnPush()
{
var next = Grab(_stage._in);
if (_last.HasValue)
_onItemChanged(_last.Value, next);
_last = next;

if (IsAvailable(_stage._out))
{
Push(_stage._out, _last.Value);
}
}

public override void OnPull()
{
if (_last.HasValue)
{
if (!HasBeenPulled(_stage._in))
{
Pull(_stage._in);
}

Push(_stage._out, _last.Value);
}
else
{
Pull(_stage._in);
}
}
}

public override string ToString()
{
return "RepeatPrevious";
}
}
}
Loading

0 comments on commit 76c9364

Please sign in to comment.