forked from RxSwiftCommunity/Action
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Action.swift
135 lines (109 loc) · 4.18 KB
/
Action.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import Foundation
import RxSwift
import RxCocoa
/// Typealias for compatibility with UIButton's rx_action property.
public typealias CocoaAction = Action<Void, Void>
/// Possible errors from invoking execute()
public enum ActionError: ErrorType {
case NotEnabled
case UnderlyingError(ErrorType)
}
/// TODO: Add some documentation.
public final class Action<Input, Element> {
public typealias WorkFactory = Input -> Observable<Element>
public let _enabledIf: Observable<Bool>
public let workFactory: WorkFactory
/// Errors aggrevated from invocations of execute().
/// Delivered on whatever scheduler they were sent from.
public var errors: Observable<ActionError> {
return self._errors.asObservable()
}
private let _errors = PublishSubject<ActionError>()
/// Whether or not we're currently executing.
/// Delivered on whatever scheduler they were sent from.
public var elements: Observable<Element> {
return self._elements.asObservable()
}
private let _elements = PublishSubject<Element>()
/// Whether or not we're currently executing.
/// Always observed on MainScheduler.
public var executing: Observable<Bool> {
return self._executing.asObservable().observeOn(MainScheduler.instance)
}
private let _executing = Variable(false)
/// Whether or not we're enabled. Note that this is a *computed* sequence
/// property based on enabledIf initializer and if we're currently executing.
/// Always observed on MainScheduler.
public var enabled: Observable<Bool> {
return _enabled.asObservable().observeOn(MainScheduler.instance)
}
public private(set) var _enabled = BehaviorSubject(value: true)
private let executingQueue = dispatch_queue_create("com.ashfurrow.Action.executingQueue", DISPATCH_QUEUE_SERIAL)
private let disposeBag = DisposeBag()
public init<B: BooleanType>(enabledIf: Observable<B>, workFactory: WorkFactory) {
self._enabledIf = enabledIf.map { booleanType in
return booleanType.boolValue
}
self.workFactory = workFactory
Observable.combineLatest(self._enabledIf, self.executing) { (enabled, executing) -> Bool in
return enabled && !executing
}.bindTo(_enabled).addDisposableTo(disposeBag)
}
}
// MARK: Convenience initializers.
public extension Action {
/// Always enabled.
public convenience init(workFactory: WorkFactory) {
self.init(enabledIf: .just(true), workFactory: workFactory)
}
}
// MARK: Execution!
public extension Action {
public func execute(input: Input) -> Observable<Element> {
// Buffer from the work to a replay subject.
let buffer = ReplaySubject<Element>.createUnbounded()
// See if we're already executing.
var startedExecuting = false
self.doLocked {
if self._enabled.valueOrFalse {
self._executing.value = true
startedExecuting = true
}
}
// Make sure we started executing and we're accidentally disabled.
guard startedExecuting else {
let error = ActionError.NotEnabled
self._errors.onNext(error)
buffer.onError(error)
return buffer
}
let work = self.workFactory(input)
defer {
// Subscribe to the work.
work.multicast(buffer).connect()
}
buffer.subscribe(onNext: { element in
self._elements.onNext(element)
},
onError: { error in
self._errors.onNext(ActionError.UnderlyingError(error))
},
onCompleted: nil,
onDisposed: {
self.doLocked { self._executing.value = false }
})
.addDisposableTo(disposeBag)
return buffer.asObservable()
}
}
private extension Action {
private func doLocked(closure: () -> Void) {
dispatch_sync(executingQueue, closure)
}
}
internal extension BehaviorSubject where Element: BooleanLiteralConvertible {
var valueOrFalse: Element {
guard let value = try? value() else { return false }
return value
}
}