Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Improve the performance of Repeat() #688

Merged
merged 4 commits into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ private static void Main()
typeof(SwitchBenchmark),
typeof(BufferCountBenchmark),
typeof(RangeBenchmark),
typeof(RepeatBenchmark),
typeof(AppendPrependBenchmark)
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Reactive.Linq;
using System.Threading;
using BenchmarkDotNet.Attributes;

namespace Benchmarks.System.Reactive
{
[MemoryDiagnoser]
public class RepeatBenchmark
{
[Params(1, 10, 100, 1000, 10000, 100000, 1000000)]
public int N;

public int _store;

[Benchmark]
public void Repeat_Infinite()
{
Observable.Repeat(1).Take(N).Subscribe(v => Volatile.Write(ref _store, v));
}

[Benchmark]
public void Repeat_Finite()
{
Observable.Repeat(1, N).Subscribe(v => Volatile.Write(ref _store, v));
}
}
}
155 changes: 122 additions & 33 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Repeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,88 @@ namespace System.Reactive.Linq.ObservableImpl
{
internal static class Repeat<TResult>
{
internal sealed class Forever : Producer<TResult, Forever._>
internal sealed class ForeverRecursive : Producer<TResult, ForeverRecursive._>
{
private readonly TResult _value;
private readonly IScheduler _scheduler;

public Forever(TResult value, IScheduler scheduler)
public ForeverRecursive(TResult value, IScheduler scheduler)
{
_value = value;
_scheduler = scheduler;
}

protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_scheduler);

internal sealed class _ : IdentitySink<TResult>
{
private readonly TResult _value;

IDisposable _task;

public _(TResult value, IObserver<TResult> observer)
: base(observer)
{
_value = value;
}

public void Run(Forever parent)
public void Run(IScheduler scheduler)
{
var longRunning = parent._scheduler.AsLongRunning();
if (longRunning != null)
{
SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
}
else
var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
Disposable.TrySetSingle(ref _task, first);
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
SetUpstream(parent._scheduler.Schedule(this, (@this, a) => @this.LoopRecInf(a)));
Disposable.TryDispose(ref _task);
}
}

private void LoopRecInf(Action<_> recurse)
private IDisposable LoopRecInf(IScheduler scheduler)
{
ForwardOnNext(_value);
recurse(this);

var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
Disposable.TrySetMultiple(ref _task, next);

return Disposable.Empty;
}
}
}

internal sealed class ForeverLongRunning : Producer<TResult, ForeverLongRunning._>
{
private readonly TResult _value;
private readonly ISchedulerLongRunning _scheduler;

public ForeverLongRunning(TResult value, ISchedulerLongRunning scheduler)
{
_value = value;
_scheduler = scheduler;
}

protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);

protected override void Run(_ sink) => sink.Run(_scheduler);

internal sealed class _ : IdentitySink<TResult>
{
private readonly TResult _value;

public _(TResult value, IObserver<TResult> observer)
: base(observer)
{
_value = value;
}

public void Run(ISchedulerLongRunning longRunning)
{
SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
}

private void LoopInf(ICancelable cancel)
Expand All @@ -66,66 +106,115 @@ private void LoopInf(ICancelable cancel)
}
}

internal sealed class Count : Producer<TResult, Count._>
internal sealed class CountRecursive : Producer<TResult, CountRecursive._>
{
private readonly TResult _value;
private readonly IScheduler _scheduler;
private readonly int _repeatCount;

public Count(TResult value, int repeatCount, IScheduler scheduler)
public CountRecursive(TResult value, int repeatCount, IScheduler scheduler)
{
_value = value;
_scheduler = scheduler;
_repeatCount = repeatCount;
}

protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_scheduler);

internal sealed class _ : IdentitySink<TResult>
{
private readonly TResult _value;

public _(TResult value, IObserver<TResult> observer)
int _remaining;

IDisposable _task;

public _(TResult value, int repeatCount, IObserver<TResult> observer)
: base(observer)
{
_value = value;
_remaining = repeatCount;
}

public void Run(Count parent)
public void Run(IScheduler scheduler)
{
var longRunning = parent._scheduler.AsLongRunning();
if (longRunning != null)
{
SetUpstream(longRunning.ScheduleLongRunning(parent._repeatCount, Loop));
}
else
var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetSingle(ref _task, first);
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
SetUpstream(parent._scheduler.Schedule(parent._repeatCount, LoopRec));
Disposable.TryDispose(ref _task);
}
}

private void LoopRec(int n, Action<int> recurse)
private IDisposable LoopRec(IScheduler scheduler)
{
if (n > 0)
var remaining = _remaining;
if (remaining > 0)
{
ForwardOnNext(_value);
n--;
_remaining = --remaining;
}

if (n == 0)
if (remaining == 0)
{
ForwardOnCompleted();
return;
}
else
{
var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetMultiple(ref _task, next);
}
return Disposable.Empty;
}
}
}

recurse(n);
internal sealed class CountLongRunning : Producer<TResult, CountLongRunning._>
{
private readonly TResult _value;
private readonly ISchedulerLongRunning _scheduler;
private readonly int _repeatCount;

public CountLongRunning(TResult value, int repeatCount, ISchedulerLongRunning scheduler)
{
_value = value;
_scheduler = scheduler;
_repeatCount = repeatCount;
}

protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);

protected override void Run(_ sink) => sink.Run(_scheduler);

internal sealed class _ : IdentitySink<TResult>
{
private readonly TResult _value;

int _remaining;

public _(TResult value, int remaining, IObserver<TResult> observer)
: base(observer)
{
_value = value;
_remaining = remaining;
}

public void Run(ISchedulerLongRunning longRunning)
{
SetUpstream(longRunning.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
}

private void Loop(int n, ICancelable cancel)
private void Loop(ICancelable cancel)
{
var value = _value;
var n = _remaining;
while (n > 0 && !cancel.IsDisposed)
{
ForwardOnNext(value);
Expand Down
28 changes: 24 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,22 +347,42 @@ private static IObservable<int> Range_(int start, int count, IScheduler schedule

public virtual IObservable<TResult> Repeat<TResult>(TResult value)
{
return new Repeat<TResult>.Forever(value, SchedulerDefaults.Iteration);
return Repeat_(value, SchedulerDefaults.Iteration);
}

public virtual IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler)
{
return new Repeat<TResult>.Forever(value, scheduler);
return Repeat_(value, scheduler);
}

private IObservable<TResult> Repeat_<TResult>(TResult value, IScheduler scheduler)
{
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
return new Repeat<TResult>.ForeverLongRunning(value, longRunning);
}
return new Repeat<TResult>.ForeverRecursive(value, scheduler);
}

public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount)
{
return new Repeat<TResult>.Count(value, repeatCount, SchedulerDefaults.Iteration);
return Repeat_(value, repeatCount, SchedulerDefaults.Iteration);
}

public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler)
{
return new Repeat<TResult>.Count(value, repeatCount, scheduler);
return Repeat_(value, repeatCount, scheduler);
}

private IObservable<TResult> Repeat_<TResult>(TResult value, int repeatCount, IScheduler scheduler)
{
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
return new Repeat<TResult>.CountLongRunning(value, repeatCount, longRunning);
}
return new Repeat<TResult>.CountRecursive(value, repeatCount, scheduler);
}

#endregion
Expand Down