Skip to content

Commit

Permalink
Processing chain interface (#552)
Browse files Browse the repository at this point in the history
General purpose chainable processor interface
  • Loading branch information
hiroshihorie authored Jan 20, 2025
1 parent f2523db commit e97ce08
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 0 deletions.
22 changes: 22 additions & 0 deletions Sources/LiveKit/Protocols/ChainedProcessor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

public protocol ChainedProcessor: AnyObject {
// Next object in the chain.
var nextProcessor: Self? { get set }
}
76 changes: 76 additions & 0 deletions Sources/LiveKit/Support/ProcessingChain.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

public class ProcessingChain<T: ChainedProcessor>: NSObject, Loggable {
// MARK: - Public properties

public var isProcessorsEmpty: Bool { countProcessors == 0 }

public var isProcessorsNotEmpty: Bool { countProcessors != 0 }

public var countProcessors: Int {
_state.read { $0.processors.compactMap(\.value).count }
}

public var allProcessors: [T] {
_state.read { $0.processors.compactMap(\.value) }
}

// MARK: - Private properties

private struct State {
var processors = [WeakRef<T>]()
}

private let _state = StateSync(State())

public func add(processor: T) {
_state.mutate { $0.processors.append(WeakRef(processor)) }
}

public func remove(processor: T) {
_state.mutate {
$0.processors.removeAll { weakRef in
guard let value = weakRef.value else { return false }
return value === processor
}
}
}

public func removeAllProcessors() {
_state.mutate { $0.processors.removeAll() }
}

public func buildProcessorChain() -> T? {
let processors = _state.read { $0.processors.compactMap(\.value) }
guard !processors.isEmpty else { return nil }

for i in 0 ..< (processors.count - 1) {
processors[i].nextProcessor = processors[i + 1]
}
// The last one doesn't have a successor
processors.last?.nextProcessor = nil

return processors.first
}

public func invokeProcessor<R>(_ fnc: @escaping (T) -> R) -> R? {
guard let chain = buildProcessorChain() else { return nil }
return fnc(chain)
}
}
23 changes: 23 additions & 0 deletions Sources/LiveKit/Support/WeakRef.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

final class WeakRef<T: AnyObject> {
weak var value: T?

init(_ value: T) {
self.value = value
}
}
132 changes: 132 additions & 0 deletions Tests/LiveKitTests/ProcessingChainTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@testable import LiveKit
import XCTest

class ProcessingChainTests: XCTestCase {
// Mock processor for testing
final class MockProcessor: NSObject, ChainedProcessor {
weak var nextProcessor: MockProcessor?

func process(value: Int) -> Int {
let result = value + 1
return nextProcessor?.process(value: result) ?? result
}
}

var chain: ProcessingChain<MockProcessor>!

override func setUp() {
super.setUp()
chain = ProcessingChain<MockProcessor>()
}

override func tearDown() {
chain = nil
super.tearDown()
}

func test_initialState() {
XCTAssertTrue(chain.isProcessorsEmpty)
XCTAssertFalse(chain.isProcessorsNotEmpty)
XCTAssertEqual(chain.countProcessors, 0)
XCTAssertTrue(chain.allProcessors.isEmpty)
}

func test_addProcessor() {
let processor = MockProcessor()
chain.add(processor: processor)

XCTAssertFalse(chain.isProcessorsEmpty)
XCTAssertTrue(chain.isProcessorsNotEmpty)
XCTAssertEqual(chain.countProcessors, 1)
XCTAssertEqual(chain.allProcessors.count, 1)
}

func test_removeProcessor() {
let processor = MockProcessor()
chain.add(processor: processor)
chain.remove(processor: processor)

XCTAssertTrue(chain.isProcessorsEmpty)
XCTAssertEqual(chain.countProcessors, 0)
XCTAssertTrue(chain.allProcessors.isEmpty)
}

func test_removeAllProcessors() {
let processor1 = MockProcessor()
let processor2 = MockProcessor()

chain.add(processor: processor1)
chain.add(processor: processor2)
XCTAssertEqual(chain.countProcessors, 2)

chain.removeAllProcessors()
XCTAssertTrue(chain.isProcessorsEmpty)
XCTAssertEqual(chain.countProcessors, 0)
}

func test_buildProcessorChain() {
let processor1 = MockProcessor()
let processor2 = MockProcessor()
let processor3 = MockProcessor()

chain.add(processor: processor1)
chain.add(processor: processor2)
chain.add(processor: processor3)

let builtChain = chain.buildProcessorChain()

XCTAssertNotNil(builtChain)
XCTAssertTrue(builtChain === processor1)
XCTAssertTrue(processor1.nextProcessor === processor2)
XCTAssertTrue(processor2.nextProcessor === processor3)
XCTAssertNil(processor3.nextProcessor)
}

func test_buildEmptyChain() {
XCTAssertNil(chain.buildProcessorChain())
}

func test_invokeProcessor() {
let processor1 = MockProcessor()
let processor2 = MockProcessor()
let processor3 = MockProcessor()

chain.add(processor: processor1)
chain.add(processor: processor2)
chain.add(processor: processor3)

let result = chain.invokeProcessor { $0.process(value: 0) }

// Each processor adds 1, so with 3 processors the final result should be 3
XCTAssertEqual(result, 3)
}

func test_weakReference() {
var processor: MockProcessor? = MockProcessor()
chain.add(processor: processor!)

XCTAssertEqual(chain.countProcessors, 1)

// Remove strong reference to processor
processor = nil

// Since we're using weak references, the processor should be removed
XCTAssertEqual(chain.countProcessors, 0)
}
}

0 comments on commit e97ce08

Please sign in to comment.