Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

combineLatest pushEach strategy #55

Closed
oliviermilla opened this issue Jul 12, 2024 · 7 comments
Closed

combineLatest pushEach strategy #55

oliviermilla opened this issue Jul 12, 2024 · 7 comments

Comments

@oliviermilla
Copy link

oliviermilla commented Jul 12, 2024

Hello,

combineLatest offers both pushEach() and pushNew() strategies.

From the documentation, one would expect the first to publish at the first update of any of its sources and the latter to publish only after all sources have emitted at least once.

But

using Rocket
a = combineLatest(never() |> async(0), from([1,2]) |> async(0))
subscribe!(a, logger())

yields

julia> [LogActor] Completed
julia> 

While I expected to get (nothing, 1), (nothing, 2) before completion.

Am I reading the documentation right?
Would another operator yield as soon as ANY source yields?

Thank you ❤️

@bvdmitri
Copy link
Member

bvdmitri commented Jul 13, 2024

combineLatest combines values from both sources into a single event, but you’re probably looking for merged, which merges different streams of events into a single stream. combineLatest cannot emit a value if one of the sources does not emit anything (as in your example with never).

never doesn't emit nothing as an event, it simply doesn't emit anything and combineLatest doesn't have any value to combine with. If you want to send nothing you can use of(nothing) to achieve this.

@bvdmitri
Copy link
Member

Also here

To ensure that the output tuple has a consistent length, combineLatest waits for all input Observables to emit at least once before it starts emitting results. This means that if some Observable emits values before other Observables started emitting, all these values but the last will be lost. On the other hand, if some Observable does not emit a value but completes, the resulting Observable will complete simultaneously without emitting anything.

@bvdmitri
Copy link
Member

For example

julia> subscribe!(combineLatest(of(nothing), from([ 1, 2 ])), logger())
[LogActor] Data: (nothing, 1)
[LogActor] Data: (nothing, 2)
[LogActor] Completed

@oliviermilla
Copy link
Author

Hello @bvdmitri,

Thank you for taking the time. I am, in fact, looking for combineLatest; I simply want a tuple of all inputs, whether or not they have emitted something yet.

merged serialize events but I could use it to trigger a single time, hopefully first.

Something like

subscribe!(combineLatest(merged(of(nothing),source1), merged(of(nothing), source2)), logger())

I'll look into it but it looks like I have to generate fake signals to get combineLatest, while pushEach() describes a trigger as soon as ANY input emits, which is exactly what I'm looking for! :)

Have a great weekend.

@bvdmitri
Copy link
Member

The problem is that never never completes, so combineLatest just waits indefinitely, you can use default_if_empty operator, but this one also requires for the internal stream to complete, e.g:

julia> a = combineLatest(completed() |> default_if_empty(nothing) |> async(0), from([1,2]) |> async(0))
CombineLatestObservable(Tuple{Any, Int64})

julia> subscribe!(a, logger())
[LogActor] Data: (nothing, 1)
[LogActor] Data: (nothing, 2)
CombineLatestSubscription()

@bauglir
Copy link

bauglir commented Jul 13, 2024

Perhaps using start_with is an alternative to ensure at least one known value is emitted for each input (which could then be ignored if necessary)? That's the pattern I'd typically use in cases like this.

subscribe!(combineLatest(source1 |> start_with(nothing), source2 |> start_with(nothing)), logger())

Optionally skipping the first emission where both fire nothing.

subscribe!(combineLatest(source1 |> start_with(nothing), source2 |> start_with(nothing)) |> skip(1), logger())

@oliviermilla
Copy link
Author

start_with seems to be the best option here.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants