Skip to content

Commit

Permalink
Merge pull request #123 from surfstudio/SPT-1998-combine-support
Browse files Browse the repository at this point in the history
SPT-1998 Поддержка combine
  • Loading branch information
mrandrewsmith authored Apr 24, 2024
2 parents 3673334 + 2121a70 commit c0dcc12
Show file tree
Hide file tree
Showing 77 changed files with 2,104 additions and 275 deletions.
122 changes: 121 additions & 1 deletion NodeKit.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions NodeKit/CacheNode/ETag/UrlETagReaderNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ open class UrlETagReaderNode: AsyncNode {

/// Пытается прочесть eTag-токен из хранилища и добавить его к запросу.
/// В случае, если прочесть токен не удалось, то управление просто передается дальше.
open func process(_ data: TransportUrlRequest) -> Observer<Json> {
open func processLegacy(_ data: TransportUrlRequest) -> Observer<Json> {
guard let tag = UserDefaults.etagStorage?.value(forKey: data.url.absoluteString) as? String else {
return next.process(data)
return next.processLegacy(data)
}

var headers = data.headers
Expand All @@ -45,7 +45,7 @@ open class UrlETagReaderNode: AsyncNode {

let newData = TransportUrlRequest(with: params, raw: data.raw)

return next.process(newData)
return next.processLegacy(newData)
}

/// Пытается прочесть eTag-токен из хранилища и добавить его к запросу.
Expand Down
6 changes: 3 additions & 3 deletions NodeKit/CacheNode/ETag/UrlETagSaverNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ open class UrlETagSaverNode: AsyncNode {

/// Пытается получить eTag-токен по ключу.
/// В любом случае передает управление дальше.
open func process(_ data: UrlProcessedResponse) -> Observer<Void> {
open func processLegacy(_ data: UrlProcessedResponse) -> Observer<Void> {
guard let tag = data.response.allHeaderFields[self.eTagHeaderKey] as? String,
let url = data.request.url,
let urlAsKey = url.withOrderedQuery()
else {
return next?.process(data) ?? .emit(data: ())
return next?.processLegacy(data) ?? .emit(data: ())
}

UserDefaults.etagStorage?.set(tag, forKey: urlAsKey)

return next?.process(data) ?? .emit(data: ())
return next?.processLegacy(data) ?? .emit(data: ())
}

/// Пытается получить eTag-токен по ключу.
Expand Down
6 changes: 3 additions & 3 deletions NodeKit/CacheNode/FirstCachePolicyNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ open class FirstCachePolicyNode: AsyncStreamNode {
/// а затем, передает управление следующему узлу.
/// В случае, если получить `URLRequest` не удалось,
/// то управление просто передается следующему узлу
open func process(_ data: RawUrlRequest) -> Observer<Json> {
open func processLegacy(_ data: RawUrlRequest) -> Observer<Json> {
let result = Context<Json>()

if let request = data.toUrlRequest() {
cacheReaderNode.process(request)
cacheReaderNode.processLegacy(request)
.onCompleted { result.emit(data: $0) }
.onError { result.emit(error: $0)}
}

next.process(data)
next.processLegacy(data)
.onCompleted { result.emit(data: $0)}
.onError { result.emit(error: $0) }

Expand Down
6 changes: 3 additions & 3 deletions NodeKit/CacheNode/IfServerFailsFromCacheNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ open class IfConnectionFailedFromCacheNode: AsyncNode {
/// Проверяет, произошла ли ошибка связи в ответ на запрос.
/// Если ошибка произошла, то возвращает успешный ответ из кэша.
/// В противном случае передает управление следующему узлу.
open func process(_ data: URLRequest) -> Observer<Json> {
open func processLegacy(_ data: URLRequest) -> Observer<Json> {

return self.next.process(data).mapError { error -> Observer<Json> in
return self.next.processLegacy(data).mapError { error -> Observer<Json> in
var logMessage = self.logViewObjectName
logMessage += "Catching \(error)" + .lineTabDeilimeter
let request = UrlNetworkRequest(urlRequest: data)
if error is BaseTechnicalError {
logMessage += "Start read cache" + .lineTabDeilimeter
return self.cacheReaderNode.process(request)
return self.cacheReaderNode.processLegacy(request)
}
logMessage += "Error is \(type(of: error))"
logMessage += "and request = \(String(describing: request))" + .lineTabDeilimeter
Expand Down
2 changes: 1 addition & 1 deletion NodeKit/CacheNode/UrlCacheReaderNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ open class UrlCacheReaderNode: AsyncNode {
}

/// Посылает запрос в кэш и пытается сериализовать данные в JSON.
open func process(_ data: UrlNetworkRequest) -> Observer<Json> {
open func processLegacy(_ data: UrlNetworkRequest) -> Observer<Json> {

guard let cachedResponse = self.extractCachedUrlResponse(data.urlRequest) else {
return self.needsToThrowError ? .emit(error: BaseUrlCacheReaderError.cantLoadDataFromCache) : Context<Json>()
Expand Down
2 changes: 1 addition & 1 deletion NodeKit/CacheNode/UrlCacheWriterNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ open class UrlCacheWriterNode: AsyncNode {

/// Формирует `CachedURLResponse` с политикой `.allowed`, сохраняет его в кэш,
/// а затем возвращает сообщение об успешной операции.
open func process(_ data: UrlProcessedResponse) -> Observer<Void> {
open func processLegacy(_ data: UrlProcessedResponse) -> Observer<Void> {
let cahced = CachedURLResponse(response: data.response, data: data.data, storagePolicy: .allowed)
URLCache.shared.storeCachedResponse(cahced, for: data.request)
return Context<Void>().emit(data: ())
Expand Down
6 changes: 3 additions & 3 deletions NodeKit/CacheNode/UrlNotModifiedTriggerNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ open class UrlNotModifiedTriggerNode: AsyncNode {

/// Проверяет http status-code. Если код соовуетствует NotModified, то возвращает запрос из кэша.
/// В протвином случае передает управление дальше.
open func process(_ data: UrlDataResponse) -> Observer<Json> {
open func processLegacy(_ data: UrlDataResponse) -> Observer<Json> {
guard data.response.statusCode == 304 else {
let log = makeErrorLog(code: data.response.statusCode)
return next.process(data).log(log)
return next.processLegacy(data).log(log)
}
return cacheReader.process(UrlNetworkRequest(urlRequest: data.request))
return cacheReader.processLegacy(UrlNetworkRequest(urlRequest: data.request))
}

/// Проверяет http status-code. Если код соовуетствует NotModified, то возвращает запрос из кэша.
Expand Down
6 changes: 3 additions & 3 deletions NodeKit/Chains/UrlChainsBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ open class UrlChainsBuilder<Route: UrlRouteProvider> {
/// Для работы с этой цепочкой в качестве модели необходимо использовать `MultipartModel`
///
/// - Returns: Корневой узел цепочки .
open func build<I, O>() -> some Node<I, O> where O: DTODecodable, O.DTO.Raw == Json, I: DTOEncodable, I.DTO.Raw == MultipartModel<[String : Data]> {
open func build<I, O>() -> some AsyncNode<I, O> where O: DTODecodable, O.DTO.Raw == Json, I: DTOEncodable, I.DTO.Raw == MultipartModel<[String : Data]> {

let reponseProcessor = self.serviceChain.urlResponseProcessingLayerChain()

Expand All @@ -246,7 +246,7 @@ open class UrlChainsBuilder<Route: UrlRouteProvider> {

/// Позволяет загрузить бинарные данные (файл) с сервера без отправки какой-то модели на сервер.
/// - Returns: Корневой узел цепочки.
open func loadData() -> some Node<Void, Data> {
open func loadData() -> some AsyncNode<Void, Data> {
let loaderParser = DataLoadingResponseProcessor()
let errorProcessor = ResponseHttpErrorProcessorNode(next: loaderParser)
let responseProcessor = ResponseProcessorNode(next: errorProcessor)
Expand All @@ -273,7 +273,7 @@ open class UrlChainsBuilder<Route: UrlRouteProvider> {

/// Позволяет загрузить бинарные данные (файл) с сервера.
/// - Returns: Корневой узел цепочки.
open func loadData<Input>() -> some Node<Input, Data> where Input: DTOEncodable, Input.DTO.Raw == Json {
open func loadData<Input>() -> some AsyncNode<Input, Data> where Input: DTOEncodable, Input.DTO.Raw == Json {

let loaderParser = DataLoadingResponseProcessor()
let errorProcessor = ResponseHttpErrorProcessorNode(next: loaderParser)
Expand Down
53 changes: 53 additions & 0 deletions NodeKit/Core/Node/Async/AsyncNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// AsyncNode.swift
// NodeKit
//
// Created by Andrei Frolov on 29.03.24.
// Copyright © 2024 Surf. All rights reserved.
//

import Combine
import Foundation

/// Протокол, наследованный от Node, добавляющий подход преобразования входных данных в результат с помощью SwiftConcurrency.
/// Применим для узлов, которые возвращают один результат.
public protocol AsyncNode<Input, Output>: Node {

/// Ассинхронный метод, который содержит логику для обработки данных
///
/// - Parameter data: Входные данные
/// - Returns: Результат обработки данных.
@discardableResult
func process(_ data: Input, logContext: LoggingContextProtocol) async -> NodeResult<Output>

/// Метод возвращающий объект для обработки результатов с помощью Combine.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineNode() -> any CombineNode<Input, Output>
}

public extension AsyncNode {

/// Метод process с созданием нового лог контекста.
@discardableResult
func process(_ data: Input) async -> NodeResult<Output> {
return await process(data, logContext: LoggingContext())
}

/// Стандартная реализация конвертации узла в ``CombineNode``.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineNode() -> any CombineNode<Input, Output> {
return AsyncCombineNode(node: self)
}
}

/// Содержит синтаксический сахар для работы с узлами, у которых входящий тип = `Void`
public extension AsyncNode where Input == Void {

/// Вызывает `process(_:)`
@discardableResult
func process() async -> NodeResult<Output> {
return await process(Void())
}
}
52 changes: 52 additions & 0 deletions NodeKit/Core/Node/Async/AsyncStreamNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// AsyncStreamNode.swift
// NodeKit
//
// Created by Andrei Frolov on 29.03.24.
// Copyright © 2024 Surf. All rights reserved.
//

import Combine
import Foundation

/// Протокол, наследованный от Node, добавляющий подход преобразования входных данных в поток результатов с помощью SwiftConcurrency
/// Применим для узлов, которые могут вернуть несколько результатов
public protocol AsyncStreamNode<Input, Output>: Node {

/// Ассинхронный метод, который содержит логику для обработки данных
///
/// - Parameter data: Входные данные
/// - Returns: Поток результатов обработки данных.
@discardableResult
func process(_ data: Input, logContext: LoggingContextProtocol) -> AsyncStream<NodeResult<Output>>

/// Метод возвращающий объект для обработки результатов с помощью Combine.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineStreamNode() -> any CombineStreamNode<Input, Output>
}

public extension AsyncStreamNode {

/// Метод process с созданием нового лог контекста.
@discardableResult
func process(_ data: Input) -> AsyncStream<NodeResult<Output>> {
return process(data, logContext: LoggingContext())
}
/// Стандартная реализация конвертации узла в ``CombineStreamNode``.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineStreamNode() -> any CombineStreamNode<Input, Output> {
return AsyncStreamCombineNode(node: self)
}
}

/// Содержит синтаксический сахар для работы с узлами, у которых входящий тип = `Void`
public extension AsyncStreamNode where Input == Void {

/// Вызывает `process(_:)`
@discardableResult
func process() -> AsyncStream<NodeResult<Output>> {
return process(Void())
}
}
48 changes: 48 additions & 0 deletions NodeKit/Core/Node/Combine/AsyncCombineNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// AsyncCombineNode.swift
// NodeKit
//
// Created by Andrei Frolov on 03.04.24.
// Copyright © 2024 Surf. All rights reserved.
//

import Combine

/// Реализация ``CombineNode`` для ``AsyncNode``.
public struct AsyncCombineNode<Input, Output>: CombineNode {

// MARK: - Private Properties

private let node: any AsyncNode<Input, Output>

// MARK: - Initialization

init(node: some AsyncNode<Input, Output>) {
self.node = node
}

// MARK: - CombineNode

/// Метод запускающий процесс обработки данных
/// и возвращающий publisher для подписки на результат.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - queue: Очередь на которой будут выдаваться результаты.
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
public func nodeResultPublisher(for data: Input, on scheduler: some Scheduler, logContext: LoggingContextProtocol) -> AnyPublisher<NodeResult<Output>, Never> {
return Future<NodeResult<Output>, Never> { promise in
Task {
let result = await node.process(data, logContext: logContext)
promise(.success(result))
}
}
.receive(on: scheduler)
.eraseToAnyPublisher()
}

public func processLegacy(_ data: Input) -> Observer<Output> {
return node.processLegacy(data)
}
}
53 changes: 53 additions & 0 deletions NodeKit/Core/Node/Combine/AsyncStreamCombineNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// AsyncStreamCombineNode.swift
// NodeKit
//
// Created by Andrei Frolov on 03.04.24.
// Copyright © 2024 Surf. All rights reserved.
//

import Foundation
import Combine

/// Реализация ``CombineStreamNode`` для ``AsyncStreamNode``.
public struct AsyncStreamCombineNode<Input, Output>: CombineStreamNode {

// MARK: - Private Properties

private let subject = PassthroughSubject<NodeResult<Output>, Never>()
private let node: any AsyncStreamNode<Input, Output>

// MARK: - Initialization

init(node: some AsyncStreamNode<Input, Output>) {
self.node = node
}

// MARK: - CombineNode

/// Publisher результата обработки данных.
/// - Parameter scheduler: Scheduler для выдачи результов.
public func nodeResultPublisher(on scheduler: some Scheduler) -> AnyPublisher<NodeResult<Output>, Never> {
return subject
.receive(on: scheduler)
.eraseToAnyPublisher()
}

/// Метод обработки данных протокола ``Node``. Будет удален в ближайшее время.
public func processLegacy(_ data: Input) -> Observer<Output> {
return node.processLegacy(data)
}

/// Метод запускающий процесс обработки данных.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - logContext: Контекст логов.
public func process(_ data: Input, logContext: LoggingContextProtocol) {
Task {
for await result in node.process(data, logContext: logContext) {
subject.send(result)
}
}
}
}
Loading

0 comments on commit c0dcc12

Please sign in to comment.