Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add withLatest(from:) (Rx.withLatestFrom) #128

Merged
merged 5 commits into from
Dec 4, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,77 @@ extension SignalProtocol {
.map { $0.0 }
}

/// Forward the latest value from `samplee` with the value from `self` as a
/// tuple, only when `self` sends a `value` event.
/// This is like a flipped version of `sample(with:)`, but `samplee`'s
/// terminal events are completely ignored.
///
/// - note: If `self` fires before a value has been observed on `samplee`,
/// nothing happens.
///
/// - parameters:
/// - samplee: A signal that its latest value is sampled by `self`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would read better as "A signal whose latest value..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9580123.

///
/// - returns: A signal that will send values from `self` and `samplee`,
/// sampled (possibly multiple times) by `self`, then terminate
/// once `self` has terminated. **`samplee`'s terminated events
/// are ignored**.
public func withLatest<U>(from samplee: Signal<U, NoError>) -> Signal<(Value, U), Error> {
return Signal { observer in
let state = Atomic<U?>(nil)
let disposable = CompositeDisposable()

disposable += samplee.observeValues { value in
state.value = value
}

disposable += self.observe { event in
switch event {
case let .value(value):
if let value2 = state.value {
observer.send(value: (value, value2))
}
case .completed:
observer.sendCompleted()
case let .failed(error):
observer.send(error: error)
case .interrupted:
observer.sendInterrupted()
}
}

return disposable
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be implemented with sample(with:)?

return samplee.sample(with: self).map { ($1, $0) }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To achieve the simplest map { ($0, stateProperty.value) } alternative as possible, I think returning signal should not get interfered with any of samplee's terminal events.

sample(with:), on the other hand, will wait for both sampler and samplee's .completed to finally emit its .completed, so this PR is not just a flipped sample(with:).

For comparison, in RxSwift, samplee's .completed is ignored but .failed is forwarded.

(As a side note, I have restricted samplee's error type as NoError only for now, allowing a future extension to also support samplee with non-NoError types when decision (ignore or forward error) is settled.)

}

/// Forward the latest value from `samplee` with the value from `self` as a
/// tuple, only when `self` sends a `value` event.
/// This is like a flipped version of `sample(with:)`, but `samplee`'s
/// terminal events are completely ignored.
///
/// - note: If `self` fires before a value has been observed on `samplee`,
/// nothing happens.
///
/// - parameters:
/// - samplee: A producer that its latest value is sampled by `self`.
///
/// - returns: A signal that will send values from `self` and `samplee`,
/// sampled (possibly multiple times) by `self`, then terminate
/// once `self` has terminated. **`samplee`'s terminated events
/// are ignored**.
public func withLatest<U>(from samplee: SignalProducer<U, NoError>) -> Signal<(Value, U), Error> {
return Signal { observer in
let d = CompositeDisposable()
samplee.startWithSignal { signal, disposable in
d += disposable
d += self.withLatest(from: signal).observe(observer)
}
return d
}
}
}

extension SignalProtocol {
/// Forwards events from `self` until `lifetime` ends, at which point the
/// returned signal will complete.
///
Expand Down
38 changes: 38 additions & 0 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,44 @@ extension SignalProducerProtocol {
return lift(Signal.sample(on:))(sampler)
}

/// Forward the latest value from `samplee` with the value from `self` as a
/// tuple, only when `self` sends a `value` event.
/// This is like a flipped version of `sample(with:)`, but `samplee`'s
/// terminal events are completely ignored.
///
/// - note: If `self` fires before a value has been observed on `samplee`,
/// nothing happens.
///
/// - parameters:
/// - samplee: A producer that its latest value is sampled by `self`.
///
/// - returns: A signal that will send values from `self` and `samplee`,
/// sampled (possibly multiple times) by `self`, then terminate
/// once `self` has terminated. **`samplee`'s terminated events
/// are ignored**.
public func withLatest<U>(from samplee: SignalProducer<U, NoError>) -> SignalProducer<(Value, U), Error> {
return liftRight(Signal.withLatest)(samplee)
}

/// Forward the latest value from `samplee` with the value from `self` as a
/// tuple, only when `self` sends a `value` event.
/// This is like a flipped version of `sample(with:)`, but `samplee`'s
/// terminal events are completely ignored.
///
/// - note: If `self` fires before a value has been observed on `samplee`,
/// nothing happens.
///
/// - parameters:
/// - samplee: A signal that its latest value is sampled by `self`.
///
/// - returns: A signal that will send values from `self` and `samplee`,
/// sampled (possibly multiple times) by `self`, then terminate
/// once `self` has terminated. **`samplee`'s terminated events
/// are ignored**.
public func withLatest<U>(from samplee: Signal<U, NoError>) -> SignalProducer<(Value, U), Error> {
return lift(Signal.withLatest)(samplee)
}

/// Forwards events from `self` until `lifetime` ends, at which point the
/// returned producer will complete.
///
Expand Down
132 changes: 132 additions & 0 deletions Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,138 @@ class SignalProducerLiftingSpec: QuickSpec {
}
}

describe("withLatest(from: signal)") {
var withLatestProducer: SignalProducer<(Int, String), NoError>!
var observer: Signal<Int, NoError>.Observer!
var sampleeObserver: Signal<String, NoError>.Observer!

beforeEach {
let (producer, incomingObserver) = SignalProducer<Int, NoError>.pipe()
let (samplee, incomingSampleeObserver) = Signal<String, NoError>.pipe()
withLatestProducer = producer.withLatest(from: samplee)
observer = incomingObserver
sampleeObserver = incomingSampleeObserver
}

it("should forward the latest value when the receiver fires") {
var result: [String] = []
withLatestProducer.startWithValues { (left, right) in result.append("\(left)\(right)") }

sampleeObserver.send(value: "a")
sampleeObserver.send(value: "b")
observer.send(value: 1)
expect(result) == [ "1b" ]
}

it("should do nothing if receiver fires before samplee sends value") {
var result: [String] = []
withLatestProducer.startWithValues { (left, right) in result.append("\(left)\(right)") }

observer.send(value: 1)
expect(result).to(beEmpty())
}

it("should send latest value with samplee value multiple times when receiver fires multiple times") {
var result: [String] = []
withLatestProducer.startWithValues { (left, right) in result.append("\(left)\(right)") }

sampleeObserver.send(value: "a")
observer.send(value: 1)
observer.send(value: 2)
expect(result) == [ "1a", "2a" ]
}

it("should complete when receiver has completed") {
var completed = false
withLatestProducer.startWithCompleted { completed = true }

observer.sendCompleted()
expect(completed) == true
}

it("should not affect when samplee has completed") {
var event: Event<(Int, String), NoError>? = nil
withLatestProducer.start { event = $0 }

sampleeObserver.sendCompleted()
expect(event).to(beNil())
}

it("should not affect when samplee has interrupted") {
var event: Event<(Int, String), NoError>? = nil
withLatestProducer.start { event = $0 }

sampleeObserver.sendInterrupted()
expect(event).to(beNil())
}
}

describe("withLatest(from: producer)") {
var withLatestProducer: SignalProducer<(Int, String), NoError>!
var observer: Signal<Int, NoError>.Observer!
var sampleeObserver: Signal<String, NoError>.Observer!

beforeEach {
let (producer, incomingObserver) = SignalProducer<Int, NoError>.pipe()
let (samplee, incomingSampleeObserver) = SignalProducer<String, NoError>.pipe()
withLatestProducer = producer.withLatest(from: samplee)
observer = incomingObserver
sampleeObserver = incomingSampleeObserver
}

it("should forward the latest value when the receiver fires") {
var result: [String] = []
withLatestProducer.startWithValues { (left, right) in result.append("\(left)\(right)") }

sampleeObserver.send(value: "a")
sampleeObserver.send(value: "b")
observer.send(value: 1)
expect(result) == [ "1b" ]
}

it("should do nothing if receiver fires before samplee sends value") {
var result: [String] = []
withLatestProducer.startWithValues { (left, right) in result.append("\(left)\(right)") }

observer.send(value: 1)
expect(result).to(beEmpty())
}

it("should send latest value with samplee value multiple times when receiver fires multiple times") {
var result: [String] = []
withLatestProducer.startWithValues { (left, right) in result.append("\(left)\(right)") }

sampleeObserver.send(value: "a")
observer.send(value: 1)
observer.send(value: 2)
expect(result) == [ "1a", "2a" ]
}

it("should complete when receiver has completed") {
var completed = false
withLatestProducer.startWithCompleted { completed = true }

observer.sendCompleted()
expect(completed) == true
}

it("should not affect when samplee has completed") {
var event: Event<(Int, String), NoError>? = nil
withLatestProducer.start { event = $0 }

sampleeObserver.sendCompleted()
expect(event).to(beNil())
}

it("should not affect when samplee has interrupted") {
var event: Event<(Int, String), NoError>? = nil
withLatestProducer.start { event = $0 }

sampleeObserver.sendInterrupted()
expect(event).to(beNil())
}
}

describe("combineLatestWith") {
var combinedProducer: SignalProducer<(Int, Double), NoError>!
var observer: Signal<Int, NoError>.Observer!
Expand Down
Loading