Skip to content

Commit

Permalink
SPT-1998 fix pr and some improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
mrandrewsmith committed Apr 3, 2024
1 parent c3e306d commit 3a6b30c
Show file tree
Hide file tree
Showing 78 changed files with 1,520 additions and 1,181 deletions.
84 changes: 44 additions & 40 deletions NodeKit.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions NodeKit/CacheNode/ETag/UrlETagReaderNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ open class UrlETagReaderNode: AsyncNode {

/// Пытается прочесть eTag-токен из хранилища и добавить его к запросу.
/// В случае, если прочесть токен не удалось, то управление просто передается дальше.
open func process(_ data: TransportUrlRequest) -> Observer<Json> {
open func processLegacy(_ data: TransportUrlRequest) -> Observer<Json> {
guard let tag = UserDefaults.etagStorage?.value(forKey: data.url.absoluteString) as? String else {
return next.process(data)
return next.processLegacy(data)
}

var headers = data.headers
Expand All @@ -45,7 +45,7 @@ open class UrlETagReaderNode: AsyncNode {

let newData = TransportUrlRequest(with: params, raw: data.raw)

return next.process(newData)
return next.processLegacy(newData)
}

/// Пытается прочесть eTag-токен из хранилища и добавить его к запросу.
Expand Down
4 changes: 2 additions & 2 deletions NodeKit/CacheNode/ETag/UrlETagSaverNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ open class UrlETagSaverNode: AsyncNode {

/// Пытается получить eTag-токен по ключу `UrlETagSaverNode.eTagHeaderKey`.
/// В любом случае передает управление дальше.
open func process(_ data: UrlProcessedResponse) -> Observer<Void> {
open func processLegacy(_ data: UrlProcessedResponse) -> Observer<Void> {
guard let tag = data.response.allHeaderFields[self.eTagHeaderKey] as? String,
let url = data.request.url,
let urlAsKey = url.withOrderedQuery()
Expand All @@ -51,7 +51,7 @@ open class UrlETagSaverNode: AsyncNode {

UserDefaults.etagStorage?.set(tag, forKey: urlAsKey)

return next?.process(data) ?? .emit(data: ())
return next?.processLegacy(data) ?? .emit(data: ())
}

/// Пытается получить eTag-токен по ключу `UrlETagSaverNode.eTagHeaderKey`.
Expand Down
6 changes: 3 additions & 3 deletions NodeKit/CacheNode/FirstCachePolicyNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ open class FirstCachePolicyNode: AsyncStreamNode {
/// а затем, передает управление следующему узлу.
/// В случае, если получить `URLRequest` не удалось,
/// то управление просто передается следующему узлу
open func process(_ data: RawUrlRequest) -> Observer<Json> {
open func processLegacy(_ data: RawUrlRequest) -> Observer<Json> {
let result = Context<Json>()

if let request = data.toUrlRequest() {
cacheReaderNode.process(request)
cacheReaderNode.processLegacy(request)
.onCompleted { result.emit(data: $0) }
.onError { result.emit(error: $0)}
}

next.process(data)
next.processLegacy(data)
.onCompleted { result.emit(data: $0)}
.onError { result.emit(error: $0) }

Expand Down
6 changes: 3 additions & 3 deletions NodeKit/CacheNode/IfServerFailsFromCacheNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ open class IfConnectionFailedFromCacheNode: AsyncNode {
/// Проверяет, произошла ли ошибка связи в ответ на запрос.
/// Если ошибка произошла, то возвращает успешный ответ из кэша.
/// В противном случае передает управление следующему узлу.
open func process(_ data: URLRequest) -> Observer<Json> {
open func processLegacy(_ data: URLRequest) -> Observer<Json> {

return self.next.process(data).mapError { error -> Observer<Json> in
return self.next.processLegacy(data).mapError { error -> Observer<Json> in
var logMessage = self.logViewObjectName
logMessage += "Catching \(error)" + .lineTabDeilimeter
let request = UrlNetworkRequest(urlRequest: data)
if error is BaseTechnicalError {
logMessage += "Start read cache" + .lineTabDeilimeter
return self.cacheReaderNode.process(request)
return self.cacheReaderNode.processLegacy(request)
}
logMessage += "Error is \(type(of: error))"
logMessage += "and request = \(String(describing: request))" + .lineTabDeilimeter
Expand Down
2 changes: 1 addition & 1 deletion NodeKit/CacheNode/UrlCacheReaderNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ open class UrlCacheReaderNode: AsyncNode {
}

/// Посылает запрос в кэш и пытается сериализовать данные в JSON.
open func process(_ data: UrlNetworkRequest) -> Observer<Json> {
open func processLegacy(_ data: UrlNetworkRequest) -> Observer<Json> {

guard let cachedResponse = self.extractCachedUrlResponse(data.urlRequest) else {
return self.needsToThrowError ? .emit(error: BaseUrlCacheReaderError.cantLoadDataFromCache) : Context<Json>()
Expand Down
2 changes: 1 addition & 1 deletion NodeKit/CacheNode/UrlCacheWriterNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ open class UrlCacheWriterNode: AsyncNode {

/// Формирует `CachedURLResponse` с политикой `.allowed`, сохраняет его в кэш,
/// а затем возвращает сообщение об успешной операции.
open func process(_ data: UrlProcessedResponse) -> Observer<Void> {
open func processLegacy(_ data: UrlProcessedResponse) -> Observer<Void> {
let cahced = CachedURLResponse(response: data.response, data: data.data, storagePolicy: .allowed)
URLCache.shared.storeCachedResponse(cahced, for: data.request)
return Context<Void>().emit(data: ())
Expand Down
6 changes: 3 additions & 3 deletions NodeKit/CacheNode/UrlNotModifiedTriggerNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ open class UrlNotModifiedTriggerNode: AsyncNode {

/// Проверяет http status-code. Если код соовуетствует NotModified, то возвращает запрос из кэша.
/// В протвином случае передает управление дальше.
open func process(_ data: UrlDataResponse) -> Observer<Json> {
open func processLegacy(_ data: UrlDataResponse) -> Observer<Json> {
guard data.response.statusCode == 304 else {
let log = makeErrorLog(code: data.response.statusCode)
return next.process(data).log(log)
return next.processLegacy(data).log(log)
}
return cacheReader.process(UrlNetworkRequest(urlRequest: data.request))
return cacheReader.processLegacy(UrlNetworkRequest(urlRequest: data.request))
}

/// Проверяет http status-code. Если код соовуетствует NotModified, то возвращает запрос из кэша.
Expand Down
39 changes: 32 additions & 7 deletions NodeKit/Core/Node/Async/AsyncNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
// Copyright © 2024 Surf. All rights reserved.
//

import Combine
import Foundation

/// Протокол наследованный от Node, добавляющий подход преобразования входных данных в результат с помощью SwiftConcurrency
/// Применим для узлов, которые возвращают один результат
public protocol AsyncNode<Input, Output>: CombineConvertibleNode {
public protocol AsyncNode<Input, Output>: CombineNode {

/// Ассинхронный метод, который содержит логику для обработки данных
///
/// - Parameter data: Входные данные
Expand All @@ -20,23 +21,47 @@ public protocol AsyncNode<Input, Output>: CombineConvertibleNode {
func process(_ data: Input, logContext: LoggingContextProtocol) async -> NodeResult<Output>
}

/// Стандартная реализация CombineCompatibleNode.
public extension AsyncNode {

/// Метод запускающий процесс обработки данных.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - queue: Очередь на которой будут выдаваться результаты.
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
for data: Input,
on scheduler: some Scheduler,
logContext: LoggingContextProtocol
) -> AnyPublisher<NodeResult<Output>, Never> {
return Future<NodeResult<Output>, Never> { promise in
Task {
let result = await process(data, logContext: logContext)
promise(.success(result))
}
}
.receive(on: scheduler)
.eraseToAnyPublisher()
}
}

public extension AsyncNode {

/// Метод process с созданием нового лог контекста.
@discardableResult
func process(_ data: Input) async -> NodeResult<Output> {
return await process(data, logContext: LoggingContext())
}

/// Базовая реализация конвертации узла в ``CombineNode``.
func combineNode() -> any CombineNode<Input, Output> {
return CombineCompatibleNode(adapter: AsyncNodeAdapter(node: self))
}
}

/// Содержит иснтаксический сахар для работы с узлами, у которых входящий тип = `Void`
public extension AsyncNode where Input == Void {

/// Вызывает `process(_:)`
@discardableResult
func process() async -> NodeResult<Output> {
return await process(Void())
}
Expand Down
22 changes: 16 additions & 6 deletions NodeKit/Core/Node/Async/AsyncStreamNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,46 @@
// Copyright © 2024 Surf. All rights reserved.
//

import Combine
import Foundation

/// Протокол наследованный от Node, добавляющий подход преобразования входных данных в поток результатов с помощью SwiftConcurrency
/// Применим для узлов, которые могут вернуть несколько результатов
public protocol AsyncStreamNode<Input, Output>: CombineConvertibleNode {
public protocol AsyncStreamNode<Input, Output>: Node {

/// Ассинхронный метод, который содержит логику для обработки данных
///
/// - Parameter data: Входные данные
/// - Returns: Поток результатов обработки данных.
@discardableResult
func process(_ data: Input, logContext: LoggingContextProtocol) -> AsyncStream<NodeResult<Output>>

/// Метод возвращающий объект для обработки результатов с помощью Combine.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineNode() -> any CombineStreamNode<Input, Output>
}

public extension AsyncStreamNode {

/// Метод process с созданием нового лог контекста.
@discardableResult
func process(_ data: Input) -> AsyncStream<NodeResult<Output>> {
return process(data, logContext: LoggingContext())
}

/// Базовая реализация конвертации узла в ``CombineNode``.
func combineNode() -> any CombineNode<Input, Output> {
return CombineCompatibleNode(adapter: AsyncStreamNodeAdapter(node: self))
/// Стандартная реализация конвертации узла в ``CombineNode``.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineNode() -> any CombineStreamNode<Input, Output> {
return AsyncStreamCombineNode(node: self)
}
}

/// Содержит иснтаксический сахар для работы с узлами, у которых входящий тип = `Void`
public extension AsyncStreamNode where Input == Void {

/// Вызывает `process(_:)`
@discardableResult
func process() -> AsyncStream<NodeResult<Output>> {
return process(Void())
}
Expand Down
53 changes: 53 additions & 0 deletions NodeKit/Core/Node/Combine/AsyncStreamCombineNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// AsyncStreamCombineNode.swift
// NodeKit
//
// Created by Andrei Frolov on 03.04.24.
// Copyright © 2024 Surf. All rights reserved.
//

import Foundation
import Combine

/// Реализация ``PublisherContainerNode`` для ``AsyncStreamNode``.
public struct AsyncStreamCombineNode<Input, Output>: CombineStreamNode {

// MARK: - Private Properties

private let subject = PassthroughSubject<NodeResult<Output>, Never>()
private let node: any AsyncStreamNode<Input, Output>

// MARK: - Initialization

init(node: some AsyncStreamNode<Input, Output>) {
self.node = node
}

// MARK: - CombineNode

/// Publisher результата обработки данных.
/// - Parameter queue: Очередь на которой будут выдаваться результаты.
public func nodeResultPublisher(on scheduler: some Scheduler) -> AnyPublisher<NodeResult<Output>, Never> {
return subject
.receive(on: scheduler)
.eraseToAnyPublisher()
}

/// Метод обработки данных протокола ``Node``. Будет удален в ближайшее время.
public func processLegacy(_ data: Input) -> Observer<Output> {
return node.processLegacy(data)
}

/// Метод запускающий процесс обработки данных.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - logContext: Контекст логов.
public func process(_ data: Input, logContext: LoggingContextProtocol) {
Task {
for await result in node.process(data, logContext: logContext) {
subject.send(result)
}
}
}
}
18 changes: 0 additions & 18 deletions NodeKit/Core/Node/Combine/CombineConvertibleNode.swift

This file was deleted.

Loading

0 comments on commit 3a6b30c

Please sign in to comment.