Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Use Schedule calls with state #558

Merged
merged 5 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ protected override IDisposable SubscribeCore(IObserver<TSource> observer)
var d = new SerialDisposable();
d.Disposable = m;

m.Disposable = scheduler.Schedule(() =>
m.Disposable = scheduler.Schedule((scheduler, source, observer, d),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are using this overload, the scheduler will be passed. Convention throughout the rest of the code is to name it 'self' (now '_'). You can then omit it from the state-tuple.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an odd case, wrapping the outer scheduler. I'm not sure if the inner scheduler would be the correct one in this situation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is generally just the executing one.

(_, state) =>
{
d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
state.d.Disposable = new ScheduledDisposable(state.scheduler, state.source.SubscribeSafe(state.observer));
return Disposable.Empty;
});

return d;
Expand Down
6 changes: 3 additions & 3 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ private void DrainQueue(ICancelable cancel)
if (shouldWait)
{
var timer = new ManualResetEventSlim();
_scheduler.Schedule(waitTime, () => { timer.Set(); });
_scheduler.Schedule(timer, waitTime, (_, state) => { state.Set(); return Disposable.Empty; });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the extension I recently added. You may omit the scheduler parameter (_), won't have to return Disposable.Empty everywhere.

Also, please give the inner variable a revealing name. I used to go for 'closureXYZ', e.g. closureTimer in this case. Just 'state' is not revealing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with that overload is that the parameters are after the lambda, which I find harder to read.

Naming it like closureTimer makes little sense as these are not captured values by the lambda like before, they are now arguments.

Copy link
Collaborator

@danielcweber danielcweber Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the signature creates ambiguities unfortunately.

If not 'closureTimer', at least something more revealing than 'state'.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename these. You could introduce an internal extension under a different name but with the same ordering of arguments.

Copy link
Collaborator

@danielcweber danielcweber Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rename it in a subsequent PR and let ReSharper do the reordering. Would you then use the Action-extension for now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the ordering of the parameters distracting with those Action overloads. I'd prefer this ordering and a renamed internal schedule method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I will file a PR with a renamed method and then we can get the Disposable.Empty out of that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.


try
{
Expand Down Expand Up @@ -473,7 +473,7 @@ protected override void RunCore(Absolute parent)
{
_ready = false;

_cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
_cancelable.Disposable = parent._scheduler.Schedule(this, parent._dueTime, (_, state) => { state.Start(); return Disposable.Empty; });
}

private void Start()
Expand Down Expand Up @@ -521,7 +521,7 @@ public L(Absolute parent, IObserver<TSource> observer, IDisposable cancel)

protected override void RunCore(Absolute parent)
{
_cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
_cancelable.Disposable = parent._scheduler.Schedule(this, parent._dueTime, (_, state) => { state.Start(); return Disposable.Empty; });
}

private void Start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,19 @@ public IDisposable Connect(IObserver<TArgs> observer)
{
if (--_count == 0)
{
_parent._scheduler.Schedule(_removeHandler.Dispose);
_parent._scheduler.Schedule(_removeHandler, (_, self) => RemoveHandlerDispose(self));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'self', throughout calls to Schedule, has bee 'traditionally' used to denote the executing scheduler. If 'this' is passed as state, I like using @this as a name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll inline the method call back here.

_parent._session = null;
}
}
});
}

}

static IDisposable RemoveHandlerDispose(SingleAssignmentDisposable d)
{
d.Dispose();
return Disposable.Empty;
}

private void Initialize()
Expand Down
6 changes: 4 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Return.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive.Linq.ObservableImpl
{
Expand Down Expand Up @@ -33,13 +34,14 @@ public _(TResult value, IObserver<TResult> observer, IDisposable cancel)

public IDisposable Run(IScheduler scheduler)
{
return scheduler.Schedule(Invoke);
return scheduler.Schedule(this, (_, self) => self.Invoke());
}

private void Invoke()
private IDisposable Invoke()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, consider the new extension and don't repeat returning Disposable.Empty.

{
ForwardOnNext(_value);
ForwardOnCompleted();
return Disposable.Empty;
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ public _(IObserver<TSource> observer, IDisposable cancel)

public IDisposable Run(Time parent)
{
var t = parent._scheduler.Schedule(parent._duration, Tick);
var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
var d = parent._source.SubscribeSafe(this);
return StableCompositeDisposable.Create(t, d);
}

private void Tick()
private IDisposable Tick()
{
_open = true;
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,15 @@ public _(IObserver<TSource> observer, IDisposable cancel)

public IDisposable Run(SkipUntil<TSource> parent)
{
var t = parent._scheduler.Schedule(parent._startTime, Tick);
var t = parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick());
var d = parent._source.SubscribeSafe(this);
return StableCompositeDisposable.Create(t, d);
}

private void Tick()
private IDisposable Tick()
{
_open = true;
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
5 changes: 3 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,18 @@ public IDisposable Run(Time parent)
{
_gate = new object();

var t = parent._scheduler.Schedule(parent._duration, Tick);
var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
var d = parent._source.SubscribeSafe(this);
return StableCompositeDisposable.Create(t, d);
}

private void Tick()
private IDisposable Tick()
{
lock (_gate)
{
ForwardOnCompleted();
}
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,18 @@ public _(IObserver<TSource> observer, IDisposable cancel)

public IDisposable Run(TakeUntil<TSource> parent)
{
var t = parent._scheduler.Schedule(parent._endTime, Tick);
var t = parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick());
var d = parent._source.SubscribeSafe(this);
return StableCompositeDisposable.Create(t, d);
}

private void Tick()
private IDisposable Tick()
{
lock (_gate)
{
ForwardOnCompleted();
}
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
8 changes: 5 additions & 3 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive.Linq.ObservableImpl
{
Expand Down Expand Up @@ -33,12 +34,13 @@ public _(Exception exception, IObserver<TResult> observer, IDisposable cancel)

public IDisposable Run(IScheduler scheduler)
{
return scheduler.Schedule(Invoke);
return scheduler.Schedule(this, (_, self) => Invoke(self));
}

private void Invoke()
private static IDisposable Invoke(_ self)
{
ForwardOnError(_exception);
self.ForwardOnError(self._exception);
return Disposable.Empty;
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ public IDisposable Run(Absolute parent)

_switched = false;

var timer = parent._scheduler.Schedule(parent._dueTime, Timeout);
var timer = parent._scheduler.Schedule(this, parent._dueTime, (_, state) => state.Timeout());

original.Disposable = parent._source.SubscribeSafe(this);

return StableCompositeDisposable.Create(_subscription, timer);
}

private void Timeout()
private IDisposable Timeout()
{
var timerWins = false;

Expand All @@ -207,6 +207,8 @@ private void Timeout()

if (timerWins)
_subscription.Disposable = _other.SubscribeSafe(GetForwarder());

return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
7 changes: 4 additions & 3 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ public _(IObserver<long> observer, IDisposable cancel)

public IDisposable Run(Single parent, DateTimeOffset dueTime)
{
return parent._scheduler.Schedule(dueTime, Invoke);
return parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke());
}

public IDisposable Run(Single parent, TimeSpan dueTime)
{
return parent._scheduler.Schedule(dueTime, Invoke);
return parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke());
}

private void Invoke()
private IDisposable Invoke()
{
ForwardOnNext(0);
ForwardOnCompleted();
return Disposable.Empty;
}
}
}
Expand Down
Loading