-
Notifications
You must be signed in to change notification settings - Fork 754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework subscription in Producer such that all Sinks set the upstream disposable by themselves. #575
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. There is a change to the csproj file which I'd think is due to VS eagerly adding stuff because you touched the generator files.
|
||
stack = new Stack<IEnumerator<IObservable<TSource>>>(); | ||
stack.Push(current); | ||
|
||
Drain(); | ||
|
||
return new RecursiveSinkDisposable(this); | ||
SetUpstream(new RecursiveSinkDisposable(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably this allocation can now be eliminated by overriding Dispose(bool)
and inlining that DisposeAll
method there. Btw there are some operators that do not have an upstream so for them, this is not really needed anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not so trivial, the Sink would have to be protected against disposal before Run has run. Can do it in a follow up.
{ | ||
_list = new List<TSource>(); | ||
|
||
var d = parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick); | ||
var s = parent._source.SubscribeSafe(this); | ||
|
||
return StableCompositeDisposable.Create(d, s); | ||
SetUpstream(StableCompositeDisposable.Create(d, s)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new structure, we can eliminate most StableCompositeDisposable
creation now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, in a follow-up PR then.
@@ -98,7 +98,7 @@ for (var j = 1; j <= i; j++) | |||
} | |||
#> | |||
|
|||
return StableCompositeDisposable.Create(subscriptions); | |||
Set(StableCompositeDisposable.Create(subscriptions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean SetUpstream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes...
{ | ||
|
||
CreateTimer(0L); | ||
|
||
Disposable.SetSingle(ref _mainDisposable, source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_mainDisposable
could be _upstream
now so this can be just SetUpstream
and no need for a separate field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...but _mainDisposable must be disposed without disposing the entire sink (L85).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, right, forgot you only added SetUpstream, and the field is still inaccessible from subclasses. Maybe worth considering so that a field can be saved if there's already one in the base Sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upstream could have SerialDisposable-semantics, that way it should be possible to save the _otherDisposable field as well. I would not like to expose otherwise private fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SetUpstream would then call SetSerial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combining these woudl require different atomics thant SetSerial. I think SetUpstream
can stay as SetSingle
but _upstream
could become protected
so that other Disposable.TryX
methods can be applied on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wouldn't it work for Timeout? It just replaces the main-upstream by the other-upstream. No combining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return of SubscribeSafe
is racing with the rest of the operator; a similar bug was fixed in #560.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're sure you don't refer to _timerDisposable? Maybe I'll work it into a subsequent PR we can then talk about, the idea seems nice and I think it can work. If it has races, you could show me what I'm missing.
@@ -42,10 +42,12 @@ public IDisposable Run(Using<TSource, TResource> parent) | |||
} | |||
catch (Exception exception) | |||
{ | |||
return StableCompositeDisposable.Create(Observable.Throw<TSource>(exception).SubscribeSafe(this), disposable); | |||
SetUpstream(StableCompositeDisposable.Create(Observable.Throw<TSource>(exception).SubscribeSafe(this), disposable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the fault of this PR but why create a sequence just to signal OnError(exception) practically immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in earlier code there was a scheduler involved.
@@ -110,7 +110,7 @@ for (var j = 1; j <= i; j++) | |||
#> | |||
}); | |||
|
|||
return StableCompositeDisposable.Create(disposables); | |||
Set(StableCompositeDisposable.Create(disposables)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean SetUpstream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@@ -56,5 +56,8 @@ | |||
<None Update="Linq\Observable\Zip.Generated.tt" Generator="TextTemplatingFileGenerator" LastGenOutput="Zip.Generated.cs" /> | |||
<Compile Update="Linq\Observable\Zip.Generated.cs" DesignTime="True" AutoGen="True" DependentUpon="Zip.Generated.tt" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<Service Include="{508349b6-6b84-4df5-91f0-309beebad82d}" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be reverted.
…disposable by themselves. This will save up to two allocations per subscription.
Sinks will no longer be passed an IDisposable on creation. Previously, the passed IDisposable would (through a SingleAssignmentDisposable) represent the upstream subscription (or whatever Producer.Run would return). Now, every Sink is responsible for setting the upstream by itself.