diff --git a/Sources/LiveKit/Core/DataChannelPair.swift b/Sources/LiveKit/Core/DataChannelPair.swift index bfe433eec..4ff5e88e3 100644 --- a/Sources/LiveKit/Core/DataChannelPair.swift +++ b/Sources/LiveKit/Core/DataChannelPair.swift @@ -77,7 +77,7 @@ class DataChannelPair: NSObject, Loggable { _reliableChannel = nil _lossyChannel = nil - openCompleter.cancel() + openCompleter.reset() // execute on .webRTC queue DispatchQueue.liveKitWebRTC.sync { diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index cd50f8950..ab608774c 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -267,8 +267,8 @@ extension Room { // Start Engine cleanUp sequence - engine.primaryTransportConnectedCompleter.cancel() - engine.publisherTransportConnectedCompleter.cancel() + engine.primaryTransportConnectedCompleter.reset() + engine.publisherTransportConnectedCompleter.reset() engine._state.mutate { // if isFullReconnect, keep connection related states diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 5b1011305..1010e0316 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -165,7 +165,7 @@ class SignalClient: MulticastDelegate { _webSocket?.close() _webSocket = nil - _joinResponseCompleter.cancel() + _joinResponseCompleter.reset() latestJoinResponse = nil // Reset state diff --git a/Sources/LiveKit/Support/AsyncCompleter.swift b/Sources/LiveKit/Support/AsyncCompleter.swift index fc7453451..e1c688b21 100644 --- a/Sources/LiveKit/Support/AsyncCompleter.swift +++ b/Sources/LiveKit/Support/AsyncCompleter.swift @@ -60,7 +60,7 @@ actor CompleterMapActor { public func reset() { // Reset call completers... for (_, value) in _completerMap { - value.cancel() + value.reset() } // Clear all completers... _completerMap.removeAll() @@ -79,13 +79,15 @@ class AsyncCompleter: Loggable { private var _returningValue: T? private var _throwingError: Error? + private let _lock = UnfairLock() + public init(label: String, timeOut: DispatchTimeInterval) { self.label = label _timeOut = timeOut } deinit { - cancel() + reset() } private func _cancelTimer() { @@ -94,46 +96,48 @@ class AsyncCompleter: Loggable { _timeOutBlock = nil } - public func cancel() { - _cancelTimer() - if _continuation != nil { - log("\(label) cancelled") + public func reset() { + _lock.sync { + _cancelTimer() + if let continuation = _continuation { + log("\(label) cancelled") + continuation.resume(throwing: AsyncCompleterError.cancelled) + } + _continuation = nil + _returningValue = nil + _throwingError = nil } - _continuation?.resume(throwing: AsyncCompleterError.cancelled) - _continuation = nil - _returningValue = nil - _throwingError = nil } public func resume(returning value: T) { log("\(label)") - - _cancelTimer() - - _returningValue = value - _continuation?.resume(returning: value) - _continuation = nil + _lock.sync { + _cancelTimer() + _returningValue = value + _continuation?.resume(returning: value) + _continuation = nil + } } public func resume(throwing error: Error) { log("\(label)") - - _cancelTimer() - - _throwingError = error - _continuation?.resume(throwing: error) - _continuation = nil + _lock.sync { + _cancelTimer() + _throwingError = error + _continuation?.resume(throwing: error) + _continuation = nil + } } public func wait() async throws -> T { // resume(returning:) already called - if let returningValue = _returningValue { + if let returningValue = _lock.sync({ _returningValue }) { log("\(label) returning value...") return returningValue } // resume(throwing:) already called - if let throwingError = _throwingError { + if let throwingError = _lock.sync({ _throwingError }) { log("\(label) throwing error...") throw throwingError } @@ -141,31 +145,33 @@ class AsyncCompleter: Loggable { log("\(label) waiting...") // Cancel any previous waits - cancel() + reset() // Create a cancel-aware timed continuation return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in - // Store reference to continuation - _continuation = continuation - // Create time-out block let timeOutBlock = DispatchWorkItem { [weak self] in guard let self else { return } self.log("\(self.label) timedOut") - self._continuation?.resume(throwing: AsyncCompleterError.timedOut) - self._continuation = nil - self.cancel() + self._lock.sync { + self._continuation?.resume(throwing: AsyncCompleterError.timedOut) + self._continuation = nil + } + self.reset() + } + _lock.sync { + // Schedule time-out block + _queue.asyncAfter(deadline: .now() + _timeOut, execute: timeOutBlock) + // Store reference to continuation + _continuation = continuation + // Store reference to time-out block + _timeOutBlock = timeOutBlock } - - // Schedule time-out block - _queue.asyncAfter(deadline: .now() + _timeOut, execute: timeOutBlock) - // Store reference to time-out block - _timeOutBlock = timeOutBlock } } onCancel: { // Cancel completer when Task gets cancelled - cancel() + reset() } } } diff --git a/Sources/LiveKit/Track/Capturers/VideoCapturer.swift b/Sources/LiveKit/Track/Capturers/VideoCapturer.swift index 0e63af735..cb8368ba5 100644 --- a/Sources/LiveKit/Track/Capturers/VideoCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/VideoCapturer.swift @@ -84,7 +84,7 @@ public class VideoCapturer: NSObject, Loggable, VideoCapturerProtocol { log("[publish] dimensions: \(String(describing: dimensions))") dimensionsCompleter.resume(returning: dimensions) } else { - dimensionsCompleter.cancel() + dimensionsCompleter.reset() } } } @@ -160,7 +160,7 @@ public class VideoCapturer: NSObject, Loggable, VideoCapturerProtocol { $0.capturer?(self, didUpdate: .stopped) } - dimensionsCompleter.cancel() + dimensionsCompleter.reset() return true }