diff --git a/Sources/LiveKit/Broadcast/IPC/BroadcastUploader.swift b/Sources/LiveKit/Broadcast/IPC/BroadcastUploader.swift index 236dd1c9e..ab596fa5f 100644 --- a/Sources/LiveKit/Broadcast/IPC/BroadcastUploader.swift +++ b/Sources/LiveKit/Broadcast/IPC/BroadcastUploader.swift @@ -25,6 +25,8 @@ final class BroadcastUploader: Sendable { private let channel: IPCChannel private let imageCodec = BroadcastImageCodec() + @Atomic private var isUploading = false + enum Error: Swift.Error { case unsupportedSample } @@ -46,9 +48,13 @@ final class BroadcastUploader: Sendable { /// Upload a sample from ReplayKit. func upload(_ sampleBuffer: CMSampleBuffer, with type: RPSampleBufferType) async throws { - switch type { - case .video: try await sendImage(sampleBuffer) - default: throw Error.unsupportedSample + guard type == .video else { throw Error.unsupportedSample } + guard !isUploading else { return } + try await asyncDefer { + isUploading = true + try await sendImage(sampleBuffer) + } defer: { + isUploading = false } } @@ -88,4 +94,18 @@ private extension VideoRotation { } } +private func asyncDefer( + _ task: () async throws -> T, + defer cleanUp: () -> Void +) async rethrows -> T { + do { + let result = try await task() + cleanUp() + return result + } catch { + cleanUp() + throw error + } +} + #endif diff --git a/Sources/LiveKit/Broadcast/Uploader/Atomic.swift b/Sources/LiveKit/Broadcast/Uploader/Atomic.swift new file mode 100644 index 000000000..8f261cd44 --- /dev/null +++ b/Sources/LiveKit/Broadcast/Uploader/Atomic.swift @@ -0,0 +1,44 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +@propertyWrapper +struct Atomic { + private var value: Value + private let lock = NSLock() + + init(wrappedValue value: Value) { + self.value = value + } + + var wrappedValue: Value { + get { load() } + set { store(newValue: newValue) } + } + + func load() -> Value { + lock.lock() + defer { lock.unlock() } + return value + } + + mutating func store(newValue: Value) { + lock.lock() + defer { lock.unlock() } + value = newValue + } +}