Skip to content

Commit

Permalink
Merge pull request #1368 from dotnet/dev/bartde/rx_nullable_part22
Browse files Browse the repository at this point in the history
Enable #nullable for the last few operators.
  • Loading branch information
bartdesmet committed Oct 3, 2020
2 parents fdcf836 + 5f749e8 commit 485e5bb
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

#nullable disable

using System.Reactive.Disposables;
using System.Reactive.Subjects;

Expand All @@ -28,7 +26,7 @@ public Multicast(IObservable<TSource> source, Func<ISubject<TSource, TIntermedia

internal sealed class _ : IdentitySink<TResult>
{
private IDisposable _connection;
private IDisposable? _connection;

public _(IObserver<TResult> observer)
: base(observer)
Expand Down
36 changes: 21 additions & 15 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

#nullable disable

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
Expand All @@ -17,20 +15,20 @@ internal sealed class Eager : Producer<TSource, Eager._>
{
private readonly IConnectableObservable<TSource> _source;

private readonly object _gate;
private readonly object _gate = new object();

/// <summary>
/// Contains the current active connection's state or null
/// if no connection is active at the moment.
/// Should be manipulated while holding the <see cref="_gate"/> lock.
/// </summary>
private RefConnection _connection;
private RefConnection? _connection;

private readonly int _minObservers;

public Eager(IConnectableObservable<TSource> source, int minObservers)
{
_source = source;
_gate = new object();
_minObservers = minObservers;
}

Expand All @@ -41,13 +39,14 @@ public Eager(IConnectableObservable<TSource> source, int minObservers)
internal sealed class _ : IdentitySink<TSource>
{
private readonly Eager _parent;

/// <summary>
/// Contains the connection reference the downstream observer
/// has subscribed to. Its purpose is to
/// avoid subscribing, connecting and disconnecting
/// while holding a lock.
/// </summary>
private RefConnection _targetConnection;
private RefConnection? _targetConnection;

public _(IObserver<TSource> observer, Eager parent)
: base(observer)
Expand All @@ -58,12 +57,13 @@ public _(IObserver<TSource> observer, Eager parent)
public void Run()
{
bool doConnect;
RefConnection conn;
RefConnection? conn;

lock (_parent._gate)
{
// get the active connection state
conn = _parent._connection;

// if null, a new connection should be established
if (conn == null)
{
Expand All @@ -74,12 +74,14 @@ public void Run()

// this is the first observer, then connect
doConnect = ++conn._count == _parent._minObservers;

// save the current connection for this observer
_targetConnection = conn;
}

// subscribe to the source first
Run(_parent._source);

// then connect the source if necessary
if (doConnect && !Disposable.GetIsDisposed(ref conn._disposable))
{
Expand All @@ -94,10 +96,11 @@ public void Run()
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
// get and forget the saved connection
var targetConnection = _targetConnection;
var targetConnection = _targetConnection!; // NB: Always set by Run prior to calling Dispose, and base class hardens protects against double-dispose.
_targetConnection = null;

lock (_parent._gate)
Expand All @@ -110,6 +113,7 @@ protected override void Dispose(bool disposing)
// nothing to do.
return;
}

// forget the current connection
_parent._connection = null;
}
Expand All @@ -127,7 +131,7 @@ protected override void Dispose(bool disposing)
private sealed class RefConnection
{
internal int _count;
internal IDisposable _disposable;
internal IDisposable? _disposable;
}
}

Expand All @@ -139,9 +143,9 @@ internal sealed class Lazy : Producer<TSource, Lazy._>
private readonly IConnectableObservable<TSource> _source;
private readonly int _minObservers;

private IDisposable _serial;
private IDisposable? _serial;
private int _count;
private IDisposable _connectableSubscription;
private IDisposable? _connectableSubscription;

public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers)
{
Expand Down Expand Up @@ -182,7 +186,7 @@ public void Run(Lazy parent)

SetUpstream(Disposable.Create(
(parent, subscription),
tuple =>
static tuple =>
{
var (closureParent, closureSubscription) = tuple;
Expand All @@ -192,15 +196,17 @@ public void Run(Lazy parent)
{
if (--closureParent._count == 0)
{
var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial);
// NB: _serial is guaranteed to be set by TrySetSerial earlier on.
var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial)!;
cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, tuple2 =>
cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, static tuple2 =>
{
lock (tuple2.closureParent._gate)
{
if (ReferenceEquals(Volatile.Read(ref tuple2.closureParent._serial), tuple2.cancelable))
{
tuple2.closureParent._connectableSubscription.Dispose();
// NB: _connectableSubscription is guaranteed to be set above, and Disposable.Create protects against double-dispose.
tuple2.closureParent._connectableSubscription!.Dispose();
tuple2.closureParent._connectableSubscription = null;
}
}
Expand Down
Loading

0 comments on commit 485e5bb

Please sign in to comment.