Skip to content

Commit

Permalink
Refactor the threading model of the signal aggregators.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Jul 3, 2017
1 parent 3c6d88a commit b82ea4f
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 59 deletions.
174 changes: 115 additions & 59 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1959,42 +1959,48 @@ private enum ThrottleWhileState<Value> {
}
}

private protocol SignalAggregateStrategy {
private protocol SignalAggregateStrategy: class {
/// Update the latest value of the signal at `position` to be `value`.
///
/// - parameters:
/// - value: The latest value emitted by the signal at `position`.
/// - position: The position of the signal.
///
/// - returns: `true` if the aggregating signal should terminate as a result of the
/// update. `false` otherwise.
mutating func update(_ value: Any, at position: Int) -> Bool
func update(_ value: Any, at position: Int)

/// Record the completion of the signal at `position`.
///
/// - parameters:
/// - position: The position of the signal.
///
/// - returns: `true` if the aggregating signal should terminate as a result of the
/// completion. `false` otherwise.
mutating func complete(at position: Int) -> Bool
func complete(at position: Int)

init(count: Int, action: @escaping (AggregateStrategyEvent) -> Void)
}

init(count: Int, action: @escaping (ContiguousArray<Any>) -> Void)
private enum AggregateStrategyEvent {
case value(ContiguousArray<Any>)
case completed
}

extension Signal {
private struct CombineLatestStrategy: SignalAggregateStrategy {
// Threading of `CombineLatestStrategy` and `ZipStrategy`.
//
// The threading models of these strategies mirror that of `Signal.Core` to allow
// recursive termial event from the upstreams that is triggered by the combined
// values.
//
// The strategies do not unique the delivery of `completed`, since `Signal` already
// guarantees that no event would ever be delivered after a terminal event.

private final class CombineLatestStrategy: SignalAggregateStrategy {
private enum Placeholder {
case none
}

private var values: ContiguousArray<Any>
private var completionCount: Int
private let action: (ContiguousArray<Any>) -> Void
var values: ContiguousArray<Any>

private var _haveAllSentInitial: Bool
private var haveAllSentInitial: Bool {
mutating get {
get {
if _haveAllSentInitial {
return true
}
Expand All @@ -2004,47 +2010,74 @@ extension Signal {
}
}

mutating func update(_ value: Any, at position: Int) -> Bool {
private let count: Int
private let lock: Lock

private let completion: Atomic<Int>
private let action: (AggregateStrategyEvent) -> Void

func update(_ value: Any, at position: Int) {
lock.lock()
values[position] = value

if haveAllSentInitial {
action(values)
action(.value(values))
}

return false
lock.unlock()

if completion.value == self.count, lock.try() {
action(.completed)
lock.unlock()
}
}

mutating func complete(at position: Int) -> Bool {
completionCount += 1
return completionCount == values.count
func complete(at position: Int) {
let count: Int = completion.modify { count in
count += 1
return count
}

if count == self.count, lock.try() {
action(.completed)
lock.unlock()
}
}

init(count: Int, action: @escaping (ContiguousArray<Any>) -> Void) {
values = ContiguousArray(repeating: Placeholder.none, count: count)
completionCount = 0
_haveAllSentInitial = false
init(count: Int, action: @escaping (AggregateStrategyEvent) -> Void) {
self.count = count
self.lock = Lock.make()
self.values = ContiguousArray(repeating: Placeholder.none, count: count)
self._haveAllSentInitial = false
self.completion = Atomic(0)
self.action = action
}
}

private struct ZipStrategy: SignalAggregateStrategy {
private final class ZipStrategy: SignalAggregateStrategy {
private let stateLock: Lock
private let sendLock: Lock

private var values: ContiguousArray<[Any]>
private var canEmit: Bool {
return values.reduce(true) { $0 && !$1.isEmpty }
}

private var hasConcurrentlyCompleted: Bool
private var isCompleted: ContiguousArray<Bool>
private let action: (ContiguousArray<Any>) -> Void

private var hasCompletedAndEmptiedSignal: Bool {
return Swift.zip(values, isCompleted).contains(where: { $0.0.isEmpty && $0.1 })
}

private var canEmit: Bool {
return values.reduce(true) { $0 && !$1.isEmpty }
}

private var areAllCompleted: Bool {
return isCompleted.reduce(true) { $0 && $1 }
}

mutating func update(_ value: Any, at position: Int) -> Bool {
private let action: (AggregateStrategyEvent) -> Void

func update(_ value: Any, at position: Int) {
stateLock.lock()
values[position].append(value)

if canEmit {
Expand All @@ -2055,33 +2088,61 @@ extension Signal {
buffer.append(values[index].removeFirst())
}

action(buffer)
let shouldComplete = areAllCompleted || hasCompletedAndEmptiedSignal
sendLock.lock()
stateLock.unlock()

if hasCompletedAndEmptiedSignal {
return true
action(.value(buffer))

if shouldComplete {
action(.completed)
}

sendLock.unlock()

stateLock.lock()

if hasConcurrentlyCompleted {
sendLock.lock()
action(.completed)
sendLock.unlock()
}
}

return false
stateLock.unlock()
}

mutating func complete(at position: Int) -> Bool {
func complete(at position: Int) {
stateLock.lock()
isCompleted[position] = true

// `zip` completes when all signals has completed, or any of the signals
// has completed without any buffered value.
return hasCompletedAndEmptiedSignal || areAllCompleted
if hasConcurrentlyCompleted || areAllCompleted || hasCompletedAndEmptiedSignal {
if sendLock.try() {
stateLock.unlock()

action(.completed)
sendLock.unlock()
return
}

hasConcurrentlyCompleted = true
}

stateLock.unlock()
}

init(count: Int, action: @escaping (ContiguousArray<Any>) -> Void) {
values = ContiguousArray(repeating: [], count: count)
isCompleted = ContiguousArray(repeating: false, count: count)
init(count: Int, action: @escaping (AggregateStrategyEvent) -> Void) {
self.values = ContiguousArray(repeating: [], count: count)
self.hasConcurrentlyCompleted = false
self.isCompleted = ContiguousArray(repeating: false, count: count)
self.action = action
self.sendLock = Lock.make()
self.stateLock = Lock.make()
}
}

private final class AggregateBuilder<Strategy: SignalAggregateStrategy> {
fileprivate var startHandlers: [(_ index: Int, _ strategy: Atomic<Strategy>, _ action: @escaping (Signal<Never, Error>.Event) -> Void) -> Disposable?]
fileprivate var startHandlers: [(_ index: Int, _ strategy: Strategy, _ action: @escaping (Signal<Never, Error>.Event) -> Void) -> Disposable?]

init() {
self.startHandlers = []
Expand All @@ -2093,22 +2154,10 @@ extension Signal {
return signal.observe { event in
switch event {
case let .value(value):
let shouldComplete = strategy.modify {
return $0.update(value, at: index)
}

if shouldComplete {
action(.completed)
}
strategy.update(value, at: index)

case .completed:
let shouldComplete = strategy.modify {
return $0.complete(at: index)
}

if shouldComplete {
action(.completed)
}
strategy.complete(at: index)

case .interrupted:
action(.interrupted)
Expand All @@ -2126,7 +2175,14 @@ extension Signal {
private convenience init<Strategy: SignalAggregateStrategy>(_ builder: AggregateBuilder<Strategy>, _ transform: @escaping (ContiguousArray<Any>) -> Value) {
self.init { observer in
let disposables = CompositeDisposable()
let strategy = Atomic(Strategy(count: builder.startHandlers.count) { observer.send(value: transform($0)) })
let strategy = Strategy(count: builder.startHandlers.count) { event in
switch event {
case let .value(value):
observer.send(value: transform(value))
case .completed:
observer.sendCompleted()
}
}

for (index, action) in builder.startHandlers.enumerated() where !disposables.isDisposed {
disposables += action(index, strategy) { observer.action($0.map { _ in fatalError() }) }
Expand Down
90 changes: 90 additions & 0 deletions Tests/ReactiveSwiftTests/SignalSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,96 @@ class SignalSpec: QuickSpec {
aObserver.send(value: ())
bObserver.send(value: ())
}

it("should not deadlock upon recursive completion of the sources") {
let (a, aObserver) = Signal<(), NoError>.pipe()
let (b, bObserver) = Signal<(), NoError>.pipe()

Signal.zip(a, b)
.observeValues { _ in
aObserver.sendCompleted()
}

aObserver.send(value: ())
bObserver.send(value: ())
}

it("should not deadlock upon recursive interruption of the sources") {
let (a, aObserver) = Signal<(), NoError>.pipe()
let (b, bObserver) = Signal<(), NoError>.pipe()

Signal.zip(a, b)
.observeResult { _ in
aObserver.sendInterrupted()
}

aObserver.send(value: ())
bObserver.send(value: ())
}

it("should not deadlock upon recursive failure of the sources") {
let (a, aObserver) = Signal<(), TestError>.pipe()
let (b, bObserver) = Signal<(), TestError>.pipe()

Signal.zip(a, b)
.observeResult { _ in
aObserver.send(error: .default)
}

aObserver.send(value: ())
bObserver.send(value: ())
}

it("should not deadlock upon disposal") {
let (a, aObserver) = Signal<(), NoError>.pipe()
let (b, bObserver) = Signal<(), NoError>.pipe()

Signal.combineLatest(a, b)
.take(first: 1)
.observeValues { _ in }

aObserver.send(value: ())
bObserver.send(value: ())
}

it("should not deadlock upon recursive completion of the sources") {
let (a, aObserver) = Signal<(), NoError>.pipe()
let (b, bObserver) = Signal<(), NoError>.pipe()

Signal.combineLatest(a, b)
.observeValues { _ in
aObserver.sendCompleted()
}

aObserver.send(value: ())
bObserver.send(value: ())
}

it("should not deadlock upon recursive interruption of the sources") {
let (a, aObserver) = Signal<(), NoError>.pipe()
let (b, bObserver) = Signal<(), NoError>.pipe()

Signal.combineLatest(a, b)
.observeResult { _ in
aObserver.sendInterrupted()
}

aObserver.send(value: ())
bObserver.send(value: ())
}

it("should not deadlock upon recursive failure of the sources") {
let (a, aObserver) = Signal<(), TestError>.pipe()
let (b, bObserver) = Signal<(), TestError>.pipe()

Signal.combineLatest(a, b)
.observeResult { _ in
aObserver.send(error: .default)
}

aObserver.send(value: ())
bObserver.send(value: ())
}
}

describe("combineLatest") {
Expand Down

0 comments on commit b82ea4f

Please sign in to comment.