diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs index 0b63865a29..643e9269fa 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -#nullable disable - using System.Reactive.Disposables; using System.Reactive.Subjects; @@ -28,7 +26,7 @@ public Multicast(IObservable source, Func { - private IDisposable _connection; + private IDisposable? _connection; public _(IObserver observer) : base(observer) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs index 1154603bf0..e42157892c 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -#nullable disable - using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; @@ -17,20 +15,20 @@ internal sealed class Eager : Producer { private readonly IConnectableObservable _source; - private readonly object _gate; + private readonly object _gate = new object(); + /// /// Contains the current active connection's state or null /// if no connection is active at the moment. /// Should be manipulated while holding the lock. /// - private RefConnection _connection; + private RefConnection? _connection; private readonly int _minObservers; public Eager(IConnectableObservable source, int minObservers) { _source = source; - _gate = new object(); _minObservers = minObservers; } @@ -41,13 +39,14 @@ public Eager(IConnectableObservable source, int minObservers) internal sealed class _ : IdentitySink { private readonly Eager _parent; + /// /// Contains the connection reference the downstream observer /// has subscribed to. Its purpose is to /// avoid subscribing, connecting and disconnecting /// while holding a lock. /// - private RefConnection _targetConnection; + private RefConnection? _targetConnection; public _(IObserver observer, Eager parent) : base(observer) @@ -58,12 +57,13 @@ public _(IObserver observer, Eager parent) public void Run() { bool doConnect; - RefConnection conn; + RefConnection? conn; lock (_parent._gate) { // get the active connection state conn = _parent._connection; + // if null, a new connection should be established if (conn == null) { @@ -74,12 +74,14 @@ public void Run() // this is the first observer, then connect doConnect = ++conn._count == _parent._minObservers; + // save the current connection for this observer _targetConnection = conn; } // subscribe to the source first Run(_parent._source); + // then connect the source if necessary if (doConnect && !Disposable.GetIsDisposed(ref conn._disposable)) { @@ -94,10 +96,11 @@ public void Run() protected override void Dispose(bool disposing) { base.Dispose(disposing); + if (disposing) { // get and forget the saved connection - var targetConnection = _targetConnection; + var targetConnection = _targetConnection!; // NB: Always set by Run prior to calling Dispose, and base class hardens protects against double-dispose. _targetConnection = null; lock (_parent._gate) @@ -110,6 +113,7 @@ protected override void Dispose(bool disposing) // nothing to do. return; } + // forget the current connection _parent._connection = null; } @@ -127,7 +131,7 @@ protected override void Dispose(bool disposing) private sealed class RefConnection { internal int _count; - internal IDisposable _disposable; + internal IDisposable? _disposable; } } @@ -139,9 +143,9 @@ internal sealed class Lazy : Producer private readonly IConnectableObservable _source; private readonly int _minObservers; - private IDisposable _serial; + private IDisposable? _serial; private int _count; - private IDisposable _connectableSubscription; + private IDisposable? _connectableSubscription; public Lazy(IConnectableObservable source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers) { @@ -182,7 +186,7 @@ public void Run(Lazy parent) SetUpstream(Disposable.Create( (parent, subscription), - tuple => + static tuple => { var (closureParent, closureSubscription) = tuple; @@ -192,15 +196,17 @@ public void Run(Lazy parent) { if (--closureParent._count == 0) { - var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial); + // NB: _serial is guaranteed to be set by TrySetSerial earlier on. + var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial)!; - cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, tuple2 => + cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, static tuple2 => { lock (tuple2.closureParent._gate) { if (ReferenceEquals(Volatile.Read(ref tuple2.closureParent._serial), tuple2.cancelable)) { - tuple2.closureParent._connectableSubscription.Dispose(); + // NB: _connectableSubscription is guaranteed to be set above, and Disposable.Create protects against double-dispose. + tuple2.closureParent._connectableSubscription!.Dispose(); tuple2.closureParent._connectableSubscription = null; } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs index ebc15bee2d..8d33954057 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -#nullable disable - using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; @@ -146,7 +144,7 @@ public _(TimeSliding parent, IObserver> observer) _timeShift = parent._timeShift; } - private RefCountDisposable _refCountDisposable; + private RefCountDisposable? _refCountDisposable; private TimeSpan _totalTime; private TimeSpan _nextShift; private TimeSpan _nextSpan; @@ -172,7 +170,7 @@ private void CreateWindow() { var s = new Subject(); _q.Enqueue(s); - ForwardOnNext(new WindowObservable(s, _refCountDisposable)); + ForwardOnNext(new WindowObservable(s, _refCountDisposable!)); // NB: _refCountDisposable gets assigned in Run. } private void CreateTimer() @@ -297,21 +295,22 @@ public TimeHopping(IObservable source, TimeSpan timeSpan, IScheduler sc internal sealed class _ : Sink> { private readonly object _gate = new object(); + private Subject _subject; public _(IObserver> observer) : base(observer) { + _subject = new Subject(); } - private Subject _subject; - private RefCountDisposable _refCountDisposable; + private RefCountDisposable? _refCountDisposable; public void Run(TimeHopping parent) { var groupDisposable = new CompositeDisposable(2); _refCountDisposable = new RefCountDisposable(groupDisposable); - CreateWindow(); + NextWindow(); groupDisposable.Add(parent._scheduler.SchedulePeriodic(this, parent._timeSpan, static @this => @this.Tick())); groupDisposable.Add(parent._source.SubscribeSafe(this)); @@ -324,14 +323,15 @@ private void Tick() lock (_gate) { _subject.OnCompleted(); - CreateWindow(); + + _subject = new Subject(); + NextWindow(); } } - private void CreateWindow() + private void NextWindow() { - _subject = new Subject(); - ForwardOnNext(new WindowObservable(_subject, _refCountDisposable)); + ForwardOnNext(new WindowObservable(_subject, _refCountDisposable!)); // NB: _refCountDisposable gets assigned in Run. } public override void OnNext(TSource value) @@ -392,26 +392,28 @@ internal sealed class _ : Sink> private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; + private Subject _s; + public _(Ferry parent, IObserver> observer) : base(observer) { _count = parent._count; _timeSpan = parent._timeSpan; _scheduler = parent._scheduler; + + _s = new Subject(); } - private Subject _s; private int _n; - private RefCountDisposable _refCountDisposable; + private RefCountDisposable? _refCountDisposable; public override void Run(IObservable source) { var groupDisposable = new CompositeDisposable(2) { _timerD }; _refCountDisposable = new RefCountDisposable(groupDisposable); - _s = new Subject(); - ForwardOnNext(new WindowObservable(_s, _refCountDisposable)); + NextWindow(); CreateTimer(_s); groupDisposable.Add(source.SubscribeSafe(this)); @@ -427,6 +429,11 @@ private void CreateTimer(Subject window) m.Disposable = _scheduler.ScheduleAction((@this: this, window), _timeSpan, static tuple => tuple.@this.Tick(tuple.window)); } + private void NextWindow() + { + ForwardOnNext(new WindowObservable(_s, _refCountDisposable!)); // NB: _refCountDisposable gets assigned in Run. + } + private void Tick(Subject window) { Subject newWindow; @@ -443,7 +450,7 @@ private void Tick(Subject window) _s.OnCompleted(); _s = newWindow; - ForwardOnNext(new WindowObservable(_s, _refCountDisposable)); + NextWindow(); } CreateTimer(newWindow); @@ -451,7 +458,7 @@ private void Tick(Subject window) public override void OnNext(TSource value) { - Subject newWindow = null; + Subject? newWindow = null; lock (_gate) { @@ -465,7 +472,7 @@ public override void OnNext(TSource value) _s.OnCompleted(); _s = newWindow; - ForwardOnNext(new WindowObservable(_s, _refCountDisposable)); + NextWindow(); } } @@ -518,27 +525,26 @@ internal sealed class _ : Sink> private readonly object _gate = new object(); private readonly AsyncLock _windowGate = new AsyncLock(); private readonly SerialDisposable _m = new SerialDisposable(); - private readonly Func> _windowClosingSelector; + private Subject _window; + public _(Selector parent, IObserver> observer) : base(observer) { _windowClosingSelector = parent._windowClosingSelector; + + _window = new Subject(); } - private ISubject _window; - private RefCountDisposable _refCountDisposable; + private RefCountDisposable? _refCountDisposable; public override void Run(IObservable source) { - _window = new Subject(); - var groupDisposable = new CompositeDisposable(2) { _m }; _refCountDisposable = new RefCountDisposable(groupDisposable); - var window = new WindowObservable(_window, _refCountDisposable); - ForwardOnNext(window); + NextWindow(); groupDisposable.Add(source.SubscribeSafe(this)); @@ -547,6 +553,12 @@ public override void Run(IObservable source) SetUpstream(_refCountDisposable); } + private void NextWindow() + { + var window = new WindowObservable(_window, _refCountDisposable!); // NB: _refCountDisposable gets assigned in Run. + ForwardOnNext(window); + } + private void CreateWindowClose() { IObservable windowClose; @@ -577,8 +589,7 @@ private void CloseWindow(IDisposable closingSubscription) _window.OnCompleted(); _window = new Subject(); - var window = new WindowObservable(_window, _refCountDisposable); - ForwardOnNext(window); + NextWindow(); } _windowGate.Wait(this, static @this => @this.CreateWindowClose()); @@ -656,23 +667,22 @@ internal sealed class _ : Sink> { private readonly object _gate = new object(); + private Subject _window; + public _(IObserver> observer) : base(observer) { + _window = new Subject(); } - private ISubject _window; - private RefCountDisposable _refCountDisposable; + private RefCountDisposable? _refCountDisposable; public void Run(Boundaries parent) { - _window = new Subject(); - var d = new CompositeDisposable(2); _refCountDisposable = new RefCountDisposable(d); - var window = new WindowObservable(_window, _refCountDisposable); - ForwardOnNext(window); + NextWindow(); d.Add(parent._source.SubscribeSafe(this)); d.Add(parent._windowBoundaries.SubscribeSafe(new WindowClosingObserver(this))); @@ -680,6 +690,12 @@ public void Run(Boundaries parent) SetUpstream(_refCountDisposable); } + private void NextWindow() + { + var window = new WindowObservable(_window, _refCountDisposable!); // NB: _refCountDisposable gets assigned in Run. + ForwardOnNext(window); + } + private sealed class WindowClosingObserver : IObserver { private readonly _ _parent; @@ -696,8 +712,7 @@ public void OnNext(TWindowClosing value) _parent._window.OnCompleted(); _parent._window = new Subject(); - var window = new WindowObservable(_parent._window, _parent._refCountDisposable); - _parent.ForwardOnNext(window); + _parent.NextWindow(); } }