Skip to content

Commit

Permalink
delay and worker Observers
Browse files Browse the repository at this point in the history
  • Loading branch information
psharanda committed Apr 17, 2017
1 parent 67fff1a commit 925b248
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 84 deletions.
50 changes: 20 additions & 30 deletions Jetpack.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

File renamed without changes.
16 changes: 0 additions & 16 deletions Sources/Disposable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,6 @@ public final class CompositeDisposable: Disposable {
}
}

public final class SerialDisposable: Disposable {
private var disposable: Disposable?

public init() {
}

public func swap(with: Disposable) {
disposable = with
}

public func dispose() {
disposable?.dispose()
disposable = nil
}
}

public final class SwapableDisposable: Disposable {
private var parentDisposable: Disposable?
private var childDisposable: Disposable?
Expand Down
File renamed without changes.
34 changes: 0 additions & 34 deletions Sources/Observable+Dispatch.swift

This file was deleted.

8 changes: 4 additions & 4 deletions Sources/Observable+Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ extension Observable where ValueType: ResultConvertible {
var numberOfRetries = 0

return Task<ResultValueType> { completion in
let serial = SerialDisposable()
let serial = SwapableDisposable()

func retryImpl() -> Disposable {
return self.start { result in
Expand All @@ -273,8 +273,8 @@ extension Observable where ValueType: ResultConvertible {
case .failure(let error):
numberOfRetries += 1
if until(error) && (numberOfRetries <= numberOfTimes) {
serial.swap(with: queue.after(timeInterval: currentTimeout) {
serial.swap(with: retryImpl())
serial.swap(child: queue.after(timeInterval: currentTimeout) {
serial.swap(parent: retryImpl())
})
currentTimeout = nextTimeout(currentTimeout)
} else {
Expand All @@ -284,7 +284,7 @@ extension Observable where ValueType: ResultConvertible {
}
}

serial.swap(with: retryImpl())
serial.swap(parent: retryImpl())
return serial
}
}
Expand Down
21 changes: 21 additions & 0 deletions Sources/Observer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@ public struct Observer<T>: Observable {
}

extension Observer {

/**
Create task with worker which will be run in workerQueue and send result to completionQueue. Worker can produce value or error.
*/
public init(workerQueue: DispatchQueue, completionQueue: DispatchQueue = .main, worker: @escaping () -> ValueType) {
self.init { completion in
return workerQueue.run(worker: worker, completionQueue: completionQueue) { (value: ValueType) in
completion(value)
}
}
}

public static func delay(timeInterval: TimeInterval, queue: DispatchQueue = .main) -> Observer<Void> {
return Observer<Void> { observer in
return queue.after(timeInterval: timeInterval) {
observer(())
}
}
}


public static func from(value: T) -> Observer<T> {
return Observer<T> { observer in
observer(value)
Expand Down

0 comments on commit 925b248

Please sign in to comment.