Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More static lambdas in Rx. #1360

Merged
merged 2 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public Timer(Action<object?> action, object? state, TimeSpan dueTime)
_state = state;
_action = action;

Disposable.SetSingle(ref _timer, new System.Threading.Timer(@this => Tick(@this!), this, dueTime, TimeSpan.FromMilliseconds(Timeout.Infinite)));
Disposable.SetSingle(ref _timer, new System.Threading.Timer(static @this => Tick(@this!), this, dueTime, TimeSpan.FromMilliseconds(Timeout.Infinite)));
}

private static void Tick(object state)
Expand Down Expand Up @@ -207,7 +207,7 @@ public PeriodicTimer(Action action, TimeSpan period)
// Rooting of the timer happens through the timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(@this => Tick(@this!), this, period, period);
_timer = new System.Threading.Timer(static @this => Tick(@this!), this, period, period);
}

private static void Tick(object state)
Expand Down Expand Up @@ -239,7 +239,7 @@ public FastPeriodicTimer(Action action)
{
_action = action;

new Thread(@this => Loop(@this!))
new Thread(static @this => Loop(@this!))
{
Name = "Rx-FastPeriodicTimer",
IsBackground = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private void InvokeNext(TState state, TimeSpan time)
var sad = new SingleAssignmentDisposable();

Group.Add(sad);
sad.Disposable = Scheduler.ScheduleAction((state, sad, @this: this), time, nextState => {
sad.Disposable = Scheduler.ScheduleAction((state, sad, @this: this), time, static nextState => {
nextState.@this.Group.Remove(nextState.sad);
nextState.@this.InvokeFirst(nextState.state);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public IDisposable Connect(IObserver<TArgs> observer)
{
if (--@this._count == 0)
{
closureParent._scheduler.ScheduleAction(@this._removeHandler, handler => handler.Dispose());
closureParent._scheduler.ScheduleAction(@this._removeHandler, static handler => handler.Dispose());
closureParent._session = null;
}
}
Expand Down
18 changes: 8 additions & 10 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

#nullable disable

using System.Reactive.Concurrency;
using System.Reactive.Disposables;

Expand Down Expand Up @@ -52,16 +50,16 @@ public _(NoTime parent, IObserver<TResult> observer)
private TState _state;
private bool _first;

public void Run(IScheduler _scheduler)
public void Run(IScheduler scheduler)
{
var longRunning = _scheduler.AsLongRunning();
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
SetUpstream(longRunning.ScheduleLongRunning(this, static (@this, c) => @this.Loop(c)));
}
else
{
SetUpstream(_scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
SetUpstream(scheduler.Schedule(this, static (@this, a) => @this.LoopRec(a)));
}
}

Expand Down Expand Up @@ -205,7 +203,7 @@ public void Run(IScheduler outerScheduler, TState initialState)
{
var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState));
timer.Disposable = outerScheduler.Schedule((@this: this, initialState), static (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState));
}

protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -256,7 +254,7 @@ private IDisposable InvokeRec(IScheduler self, TState state)

var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));
timer.Disposable = self.Schedule((@this: this, state), time, static (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));

return Disposable.Empty;
}
Expand Down Expand Up @@ -314,7 +312,7 @@ public void Run(IScheduler outerScheduler, TState initialState)
{
var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState));
timer.Disposable = outerScheduler.Schedule((@this: this, initialState), static (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState));
}

protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -365,7 +363,7 @@ private IDisposable InvokeRec(IScheduler self, TState state)

var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));
timer.Disposable = self.Schedule((@this: this, state), time, static (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));

return Disposable.Empty;
}
Expand Down
6 changes: 3 additions & 3 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public RangeSink(int start, int count, IObserver<int> observer)

public void Run(IScheduler scheduler)
{
var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var first = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetSingle(ref _task, first);
}

Expand All @@ -61,7 +61,7 @@ private IDisposable LoopRec(IScheduler scheduler)
{
_index = idx + 1;
ForwardOnNext(idx);
var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var next = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetMultiple(ref _task, next);
}
else
Expand Down Expand Up @@ -104,7 +104,7 @@ public RangeSink(int start, int count, IObserver<int> observer)

public void Run(ISchedulerLongRunning scheduler)
{
SetUpstream(scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
SetUpstream(scheduler.ScheduleLongRunning(this, static (@this, cancel) => @this.Loop(cancel)));
}

private void Loop(ICancelable cancel)
Expand Down
12 changes: 6 additions & 6 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Repeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public _(TResult value, IObserver<TResult> observer)

public void Run(IScheduler scheduler)
{
var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
var first = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
Disposable.TrySetSingle(ref _task, first);
}

Expand All @@ -57,7 +57,7 @@ private IDisposable LoopRecInf(IScheduler scheduler)
{
ForwardOnNext(_value);

var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
var next = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
Disposable.TrySetMultiple(ref _task, next);

return Disposable.Empty;
Expand Down Expand Up @@ -92,7 +92,7 @@ public _(TResult value, IObserver<TResult> observer)

public void Run(ISchedulerLongRunning longRunning)
{
SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
SetUpstream(longRunning.ScheduleLongRunning(this, static (@this, c) => @this.LoopInf(c)));
}

private void LoopInf(ICancelable cancel)
Expand Down Expand Up @@ -142,7 +142,7 @@ public _(TResult value, int repeatCount, IObserver<TResult> observer)

public void Run(IScheduler scheduler)
{
var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var first = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetSingle(ref _task, first);
}

Expand Down Expand Up @@ -170,7 +170,7 @@ private IDisposable LoopRec(IScheduler scheduler)
}
else
{
var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var next = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetMultiple(ref _task, next);
}
return Disposable.Empty;
Expand Down Expand Up @@ -209,7 +209,7 @@ public _(TResult value, int remaining, IObserver<TResult> observer)

public void Run(ISchedulerLongRunning longRunning)
{
SetUpstream(longRunning.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
SetUpstream(longRunning.ScheduleLongRunning(this, static (@this, cancel) => @this.Loop(cancel)));
}

private void Loop(ICancelable cancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public _(TResult value, IObserver<TResult> observer)

public void Run(IScheduler scheduler)
{
SetUpstream(scheduler.ScheduleAction(this, @this => @this.Invoke()));
SetUpstream(scheduler.ScheduleAction(this, static @this => @this.Invoke()));
}

private void Invoke()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ public override void OnNext(TSource value)
}
else
{
task.ContinueWithState((t, tuple) => tuple.@this.OnCompletedTask(tuple.value, t), (@this: this, value), _cancel.Token);
task.ContinueWithState(static (t, tuple) => tuple.@this.OnCompletedTask(tuple.value, t), (@this: this, value), _cancel.Token);
}
}

Expand Down Expand Up @@ -750,7 +750,7 @@ public override void OnNext(TSource value)
}
else
{
task.ContinueWithState((t, tuple) => tuple.@this.OnCompletedTask(tuple.value, tuple.index, t), (@this: this, value, index), _cancel.Token);
task.ContinueWithState(static (t, tuple) => tuple.@this.OnCompletedTask(tuple.value, tuple.index, t), (@this: this, value, index), _cancel.Token);
}
}

Expand Down
12 changes: 6 additions & 6 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public override void OnCompleted()
var longRunning = _loopScheduler.AsLongRunning();
if (longRunning != null)
{
Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(this, static (@this, c) => @this.Loop(c)));
}
else
{
var first = _loopScheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var first = _loopScheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetSingle(ref _loopDisposable, first);
}
}
Expand All @@ -85,7 +85,7 @@ private IDisposable LoopRec(IScheduler scheduler)
{
ForwardOnNext(_queue.Dequeue());

var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var next = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetMultiple(ref _loopDisposable, next);
}
else
Expand Down Expand Up @@ -186,11 +186,11 @@ public override void OnCompleted()
var longRunning = _loopScheduler.AsLongRunning();
if (longRunning != null)
{
Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(this, static (@this, c) => @this.Loop(c)));
}
else
{
var first = _loopScheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var first = _loopScheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetSingle(ref _loopDisposable, first);
}
}
Expand All @@ -201,7 +201,7 @@ private IDisposable LoopRec(IScheduler scheduler)
{
ForwardOnNext(_queue.Dequeue().Value);

var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
var next = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetMultiple(ref _loopDisposable, next);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public override void OnNext(TSource value)
}

Disposable.TrySetSerial(ref _serialCancelable, null);
Disposable.TrySetSerial(ref _serialCancelable, _scheduler.ScheduleAction((@this: this, currentid), _dueTime, tuple => tuple.@this.Propagate(tuple.currentid)));
Disposable.TrySetSerial(ref _serialCancelable, _scheduler.ScheduleAction((@this: this, currentid), _dueTime, static tuple => tuple.@this.Propagate(tuple.currentid)));
}

private void Propagate(ulong currentid)
Expand Down
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public _(Exception exception, IObserver<TResult> observer)

public void Run(IScheduler scheduler)
{
SetUpstream(scheduler.ScheduleAction(this, @this => @this.ForwardOnError(@this._exception)));
SetUpstream(scheduler.ScheduleAction(this, static @this => @this.ForwardOnError(@this._exception)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public _(IObservable<TSource> other, IObserver<TSource> observer)

public void Run(Absolute parent)
{
SetUpstream(parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Timeout()));
SetUpstream(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Timeout()));

Disposable.TrySetSingle(ref _serialDisposable, parent._source.SubscribeSafe(this));
}
Expand Down
10 changes: 5 additions & 5 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public _(TimeSpan period, IObserver<long> observer)

public void Run(Periodic parent, DateTimeOffset dueTime)
{
SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
SetUpstream(parent._scheduler.Schedule(this, dueTime, static (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
}

public void Run(Periodic parent, TimeSpan dueTime)
Expand All @@ -140,11 +140,11 @@ public void Run(Periodic parent, TimeSpan dueTime)
//
if (dueTime == _period)
{
SetUpstream(parent._scheduler.SchedulePeriodic(this, _period, @this => @this.Tick()));
SetUpstream(parent._scheduler.SchedulePeriodic(this, _period, static @this => @this.Tick()));
}
else
{
SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
SetUpstream(parent._scheduler.Schedule(this, dueTime, static (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
}
}

Expand Down Expand Up @@ -227,7 +227,7 @@ private IDisposable InvokeStart(IScheduler self)
var d = new SingleAssignmentDisposable();
_periodic = d;
_index = 1;
d.Disposable = self.SchedulePeriodic(this, _period, @this => @this.Tock());
d.Disposable = self.SchedulePeriodic(this, _period, static @this => @this.Tock());

try
{
Expand All @@ -247,7 +247,7 @@ private IDisposable InvokeStart(IScheduler self)
//
if (Interlocked.Decrement(ref _pendingTickCount) > 0)
{
var c = self.Schedule((@this: this, index: 1L), (tuple, action) => tuple.@this.CatchUp(tuple.index, action));
var c = self.Schedule((@this: this, index: 1L), static (tuple, action) => tuple.@this.CatchUp(tuple.index, action));

return StableCompositeDisposable.Create(d, c);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void Run(IEnumerable<TSource> source, IScheduler scheduler)
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
}

protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -119,7 +119,7 @@ private IDisposable LoopRec(IScheduler scheduler)
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));

return Disposable.Empty;
}
Expand Down Expand Up @@ -162,7 +162,7 @@ public void Run(IEnumerable<TSource> source, ISchedulerLongRunning scheduler)
return;
}

SetUpstream(scheduler.ScheduleLongRunning((@this: this, e), (tuple, cancelable) => tuple.@this.Loop(tuple.e, cancelable)));
SetUpstream(scheduler.ScheduleLongRunning((@this: this, e), static (tuple, cancelable) => tuple.@this.Loop(tuple.e, cancelable)));
}

private void Loop(IEnumerator<TSource> enumerator, ICancelable cancel)
Expand Down
Loading