Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behaviour change in Task<IDisposable> overload of Observable.Create in version 4.1? #815

Closed
daniel-smith opened this issue Sep 28, 2018 · 3 comments

Comments

@daniel-smith
Copy link

When updating from version 4.0 to 4.1, I observed some odd behaviour, which I boiled down to the following:

var obs = Observable.Create<int>(async o =>
{
    var inner = Observable.Range(1, 3);

    return inner.Subscribe(x => o.OnNext(x));
});

var result = obs.Take(1).Wait();

In our actual code we are calling an async method prior to subscribing to the inner observable, but there's no need to do that in order to reproduce the issue.

In version 4.0, this works perfectly fine, and the value of result is 1.

In 4.1, an InvalidOperationException "Sequence contains no elements" is thrown. This only seems to occur if the inner stream does not complete, i.e. if I replace the Observable.Range(1, 3) to Observable.Return(1), it works fine.

We fixed our code by converting the actual async call we were doing to an observable by using Observable.FromAsync, then doing a SelectMany over that (which is probably a better solution anyway). Surprised to see this behaviour change though. Could this be a bug?

@daniel-smith daniel-smith changed the title Behaviour change in Task<IDisposable>overload of Observable.Create in version 4.1? Behaviour change in Task<IDisposable> overload of Observable.Create in version 4.1? Sep 28, 2018
@dunkymole
Copy link

dunkymole commented Sep 28, 2018

Just to add to this. The OP didnt include an await in the async lambda. I have observed that one not only needs an await, the await needs to complete asynchronously.

This wont work:

    var obs = Observable.Create<int>(async o =>
    {
        await Task.CompletedTask;
        
        var inner = Observable.Range(1, 3);
        
        return inner.Subscribe(x => o.OnNext(x));
    });
    
    var result = obs.Take(1).Wait();

This is fine:

    {
        await Task.Factory.StartNew(() => {});
        
        var inner = Observable.Range(1, 3);
        
        return inner.Subscribe(x => o.OnNext(x));
    });
    
    var result = obs.Take(1).Wait();

 

@akarnokd
Copy link
Collaborator

The Task returned by the function is subscribed (!) to by TaskDisposeCompletionObserver which then completes the Observer of Take and thus no items are likely received from range.

Looks like there was a mistake in https://github.com/dotnet/reactive/pull/549/files#diff-88bb42c08f322e657242aaba642022feL66 where the rewrite added that OnComplete call. I'll post a PR shortly to fix that.

@akarnokd
Copy link
Collaborator

Interestingly, different overloads behaved differently:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants