Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPT-1998 Поддержка отмены таски для Combine (2) #137

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 70 additions & 35 deletions NodeKit/NodeKit.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

33 changes: 22 additions & 11 deletions NodeKit/NodeKit/Core/Node/Async/AsyncNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import Combine
import Foundation

/// Протокол, наследованный от Node, добавляющий подход преобразования входных данных в результат с помощью SwiftConcurrency.
/// Протокол ноды, описывающий подход преобразования входных данных в результат с помощью SwiftConcurrency.
/// Поддерживает обработку результатов с помощью Combine, наследуя протокол ``CombineCompatibleNode``.
/// Содержит параметры для логов, наследуя протокол ``LoggableNode``.
/// Применим для узлов, которые возвращают один результат.
public protocol AsyncNode<Input, Output>: Node {
public protocol AsyncNode<Input, Output>: LoggableNode, CombineCompatibleNode<Self.Input, Self.Output> {
associatedtype Input
associatedtype Output

Expand All @@ -22,11 +24,6 @@ public protocol AsyncNode<Input, Output>: Node {
@discardableResult
func process(_ data: Input, logContext: LoggingContextProtocol) async -> NodeResult<Output>

/// Метод, возвращающий объект для обработки результатов с помощью Combine.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineNode() -> any CombineNode<Input, Output>

/// Метод, возвращающий структуру-обертку текущей ноды.
/// Необходим для избежания проблем, возникающих при использовании any AsyncNode
///
Expand All @@ -48,11 +45,25 @@ public extension AsyncNode {
return await process(data, logContext: LoggingContext())
}

/// Стандартная реализация конвертации узла в ``CombineNode``.
/// Метод получения Publisher для подписки на результат.
/// Базовая реализация ``CombineCompatibleNode``.
/// При каждой подписке вызывает метод process с новой таской.
/// При вызове cancel вызывает cancel у таски.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineNode() -> any CombineNode<Input, Output> {
return AsyncCombineNode(node: self)
/// - Parameters:
/// - data: Входные данные ноды.
/// - scheduler: Scheduler для выдачи результата.
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
for data: Input,
on scheduler: some Scheduler,
logContext: LoggingContextProtocol
) -> AnyPublisher<NodeResult<Output>, Never> {
return AsyncNodeResultPublisher(node: self, input: data, logContext: logContext)
.receive(on: scheduler)
.eraseToAnyPublisher()
}

/// Метод, возвращающий структуру-обертку текущей ноды.
Expand Down
34 changes: 23 additions & 11 deletions NodeKit/NodeKit/Core/Node/Async/AsyncStreamNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import Combine
import Foundation

/// Протокол, наследованный от Node, добавляющий подход преобразования входных данных в поток результатов с помощью SwiftConcurrency
/// Протокол ноды, описывающий подход преобразования входных данных в результат с помощью SwiftConcurrency.
/// Поддерживает обработку результатов с помощью Combine, наследуя протокол ``CombineCompatibleNode``.
/// Содержит параметры для логов, наследуя протокол ``LoggableNode``.
/// Применим для узлов, которые могут вернуть несколько результатов
public protocol AsyncStreamNode<Input, Output>: Node {
public protocol AsyncStreamNode<Input, Output>: LoggableNode, CombineCompatibleNode<Self.Input, Self.Output> {
associatedtype Input
associatedtype Output

Expand All @@ -22,11 +24,6 @@ public protocol AsyncStreamNode<Input, Output>: Node {
@discardableResult
func process(_ data: Input, logContext: LoggingContextProtocol) -> AsyncStream<NodeResult<Output>>

/// Метод возвращающий объект для обработки результатов с помощью Combine.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineStreamNode() -> any CombineStreamNode<Input, Output>

/// Метод, возвращающий структуру-обертку текущей ноды.
/// Необходим для избежания проблем, возникающих при использовании any AsyncStreamNode
///
Expand All @@ -41,11 +38,26 @@ public extension AsyncStreamNode {
func process(_ data: Input) -> AsyncStream<NodeResult<Output>> {
return process(data, logContext: LoggingContext())
}
/// Стандартная реализация конвертации узла в ``CombineStreamNode``.

/// Метод получения Publisher для подписки на результат.
/// Базовая реализация ``CombineCompatibleNode``.
/// При каждой подписке вызывает метод process с новой таской.
/// При вызове cancel вызывает cancel у таски.
///
/// - Returns: Узел, поддерживающий обработку результатов с помощью Combine.
func combineStreamNode() -> any CombineStreamNode<Input, Output> {
return AsyncStreamCombineNode(node: self)
/// - Parameters:
/// - data: Входные данные ноды.
/// - scheduler: Scheduler для выдачи результата.
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
for data: Input,
on scheduler: some Scheduler,
logContext: LoggingContextProtocol
) -> AnyPublisher<NodeResult<Output>, Never> {
return AsyncStreamNodeResultPublisher(node: self, input: data, logContext: logContext)
.receive(on: scheduler)
.eraseToAnyPublisher()
}

/// Метод, возвращающий структуру-обертку текущей ноды.
Expand Down
44 changes: 0 additions & 44 deletions NodeKit/NodeKit/Core/Node/Combine/AsyncCombineNode.swift

This file was deleted.

48 changes: 0 additions & 48 deletions NodeKit/NodeKit/Core/Node/Combine/AsyncStreamCombineNode.swift

This file was deleted.

122 changes: 122 additions & 0 deletions NodeKit/NodeKit/Core/Node/Combine/CombineCompatibleNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//
// CombineCompatibleNode.swift
// NodeKit
//
// Created by Andrei Frolov on 17.04.24.
// Copyright © 2024 Surf. All rights reserved.
//

import Foundation
import Combine

/// Протокол ноды, описывающий подход преобразования входных данных в результат с помощью Combine.
public protocol CombineCompatibleNode<I, O> {
associatedtype I
associatedtype O

/// Метод получения Publisher для подписки на результат.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - scheduler: Scheduler для выдачи результата.
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
func nodeResultPublisher(
for data: I,
on scheduler: some Scheduler,
logContext: LoggingContextProtocol
) -> AnyPublisher<NodeResult<O>, Never>
}

public extension CombineCompatibleNode {

/// Метод получения Publisher, возвращающего результат на главной очереди.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
for data: I,
logContext: LoggingContextProtocol
) -> AnyPublisher<NodeResult<O>, Never> {
return nodeResultPublisher(for: data, on: DispatchQueue.main, logContext: logContext)
}

/// Метод получения Publisher с новым лог контекстом
/// и кастомного Scheduler для выдачи результата.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - scheduler: Scheduler для выдачи результата.
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
for data: I,
on scheduler: some Scheduler
) -> AnyPublisher<NodeResult<O>, Never> {
return nodeResultPublisher(for: data, on: scheduler, logContext: LoggingContext())
}

/// Метод получения Publisher с новым лог контекстом, возвращающего результат на главной очереди.
///
/// - Parameters:
/// - data: Входные данные ноды.
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
for data: I
) -> AnyPublisher<NodeResult<O>, Never> {
return nodeResultPublisher(for: data, on: DispatchQueue.main)
}
}

/// Содержит синтаксический сахар для работы с узлами, у которых входящий тип = `Void`
public extension CombineCompatibleNode where I == Void {

/// Метод получения Publisher с кастомным Scheduler.
///
/// - Parameters:
/// - scheduler: Scheduler для выдачи результата.
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
func nodeResultPublisher(
on scheduler: some Scheduler,
logContext: LoggingContextProtocol
) -> AnyPublisher<NodeResult<O>, Never> {
return nodeResultPublisher(for: Void(), on: scheduler, logContext: logContext)
}

/// Метод получения Publisher с кастомным Scheduler и созданием нового лог контекста.
///
/// - Parameters:
/// - scheduler: Scheduler для выдачи результата.
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
on scheduler: some Scheduler
) -> AnyPublisher<NodeResult<O>, Never> {
return nodeResultPublisher(for: Void(), on: scheduler)
}

/// Метод получения Publisher, возвращающего результат на главной очереди.
///
/// - Parameters:
/// - logContext: Контекст логов.
/// - Returns: Publisher для подписки на результат.
func nodeResultPublisher(
logContext: LoggingContextProtocol
) -> AnyPublisher<NodeResult<O>, Never> {
return nodeResultPublisher(for: Void(), logContext: logContext)
}

/// Метод получения Publisher с новым лог контекстом, возвращающего результат на главной очереди.
///
/// - Returns: Publisher для подписки на результат.
@discardableResult
func nodeResultPublisher(
) -> AnyPublisher<NodeResult<O>, Never> {
return nodeResultPublisher(for: Void())
}
}
Loading
Loading