Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable #nullable for the last few operators. #1368

Merged
merged 1 commit into from
Oct 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

#nullable disable

using System.Reactive.Disposables;
using System.Reactive.Subjects;

Expand All @@ -28,7 +26,7 @@ public Multicast(IObservable<TSource> source, Func<ISubject<TSource, TIntermedia

internal sealed class _ : IdentitySink<TResult>
{
private IDisposable _connection;
private IDisposable? _connection;

public _(IObserver<TResult> observer)
: base(observer)
Expand Down
36 changes: 21 additions & 15 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

#nullable disable

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
Expand All @@ -17,20 +15,20 @@ internal sealed class Eager : Producer<TSource, Eager._>
{
private readonly IConnectableObservable<TSource> _source;

private readonly object _gate;
private readonly object _gate = new object();

/// <summary>
/// Contains the current active connection's state or null
/// if no connection is active at the moment.
/// Should be manipulated while holding the <see cref="_gate"/> lock.
/// </summary>
private RefConnection _connection;
private RefConnection? _connection;

private readonly int _minObservers;

public Eager(IConnectableObservable<TSource> source, int minObservers)
{
_source = source;
_gate = new object();
_minObservers = minObservers;
}

Expand All @@ -41,13 +39,14 @@ public Eager(IConnectableObservable<TSource> source, int minObservers)
internal sealed class _ : IdentitySink<TSource>
{
private readonly Eager _parent;

/// <summary>
/// Contains the connection reference the downstream observer
/// has subscribed to. Its purpose is to
/// avoid subscribing, connecting and disconnecting
/// while holding a lock.
/// </summary>
private RefConnection _targetConnection;
private RefConnection? _targetConnection;

public _(IObserver<TSource> observer, Eager parent)
: base(observer)
Expand All @@ -58,12 +57,13 @@ public _(IObserver<TSource> observer, Eager parent)
public void Run()
{
bool doConnect;
RefConnection conn;
RefConnection? conn;

lock (_parent._gate)
{
// get the active connection state
conn = _parent._connection;

// if null, a new connection should be established
if (conn == null)
{
Expand All @@ -74,12 +74,14 @@ public void Run()

// this is the first observer, then connect
doConnect = ++conn._count == _parent._minObservers;

// save the current connection for this observer
_targetConnection = conn;
}

// subscribe to the source first
Run(_parent._source);

// then connect the source if necessary
if (doConnect && !Disposable.GetIsDisposed(ref conn._disposable))
{
Expand All @@ -94,10 +96,11 @@ public void Run()
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
// get and forget the saved connection
var targetConnection = _targetConnection;
var targetConnection = _targetConnection!; // NB: Always set by Run prior to calling Dispose, and base class hardens protects against double-dispose.
_targetConnection = null;

lock (_parent._gate)
Expand All @@ -110,6 +113,7 @@ protected override void Dispose(bool disposing)
// nothing to do.
return;
}

// forget the current connection
_parent._connection = null;
}
Expand All @@ -127,7 +131,7 @@ protected override void Dispose(bool disposing)
private sealed class RefConnection
{
internal int _count;
internal IDisposable _disposable;
internal IDisposable? _disposable;
}
}

Expand All @@ -139,9 +143,9 @@ internal sealed class Lazy : Producer<TSource, Lazy._>
private readonly IConnectableObservable<TSource> _source;
private readonly int _minObservers;

private IDisposable _serial;
private IDisposable? _serial;
private int _count;
private IDisposable _connectableSubscription;
private IDisposable? _connectableSubscription;

public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers)
{
Expand Down Expand Up @@ -182,7 +186,7 @@ public void Run(Lazy parent)

SetUpstream(Disposable.Create(
(parent, subscription),
tuple =>
static tuple =>
{
var (closureParent, closureSubscription) = tuple;

Expand All @@ -192,15 +196,17 @@ public void Run(Lazy parent)
{
if (--closureParent._count == 0)
{
var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial);
// NB: _serial is guaranteed to be set by TrySetSerial earlier on.
var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial)!;

cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, tuple2 =>
cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, static tuple2 =>
{
lock (tuple2.closureParent._gate)
{
if (ReferenceEquals(Volatile.Read(ref tuple2.closureParent._serial), tuple2.cancelable))
{
tuple2.closureParent._connectableSubscription.Dispose();
// NB: _connectableSubscription is guaranteed to be set above, and Disposable.Create protects against double-dispose.
tuple2.closureParent._connectableSubscription!.Dispose();
tuple2.closureParent._connectableSubscription = null;
}
}
Expand Down
Loading