Skip to content

Commit

Permalink
Add Combine support to DarwinNotificationCenter
Browse files Browse the repository at this point in the history
Adds the ability to get a publisher for receiving notifications.
  • Loading branch information
ladvoc committed Jan 18, 2025
1 parent f71247f commit b9566c5
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 41 deletions.
132 changes: 91 additions & 41 deletions Sources/LiveKit/Broadcast/Uploader/DarwinNotificationCenter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import Combine
import Foundation

enum DarwinNotification: String {
Expand All @@ -23,7 +24,6 @@ enum DarwinNotification: String {

final class DarwinNotificationCenter: @unchecked Sendable {
public static let shared = DarwinNotificationCenter()

private let notificationCenter = CFNotificationCenterGetDarwinNotifyCenter()

func postNotification(_ name: DarwinNotification) {
Expand All @@ -33,50 +33,100 @@ final class DarwinNotificationCenter: @unchecked Sendable {
nil,
true)
}
}

extension DarwinNotificationCenter {
/// Returns a publisher that emits events when broadcasting notifications matching the given name.
func publisher(for name: DarwinNotification) -> Publisher {
Publisher(notificationCenter, name)
}

/// Returns an asynchronous sequence that emits a signal whenever a notification with the given name is received.
func notifications(named name: DarwinNotification) -> AsyncStream<Void> {
AsyncStream { continuation in
continuation.onTermination = { @Sendable _ in
self.stopObserving(name)
}
self.startObserving(name) {
continuation.yield()
}
/// A publisher that emits notifications.
struct Publisher: Combine.Publisher {
typealias Output = DarwinNotification
typealias Failure = Never

private let name: DarwinNotification
private let center: CFNotificationCenter?

fileprivate init(_ center: CFNotificationCenter?, _ name: DarwinNotification) {
self.name = name
self.center = center
}

func receive<S>(
subscriber: S
) where S: Subscriber, Never == S.Failure, DarwinNotification == S.Input {
subscriber.receive(subscription: Subscription(subscriber, center, name))
}
}

private var handlers = [DarwinNotification: () -> Void]()

private func startObserving(_ name: DarwinNotification, _ handler: @escaping () -> Void) {
handlers[name] = handler
CFNotificationCenterAddObserver(notificationCenter,
Unmanaged.passUnretained(self).toOpaque(),
Self.observationHandler,
name.rawValue as CFString,
nil,
.deliverImmediately)
}

private func stopObserving(_ name: DarwinNotification) {
CFNotificationCenterRemoveObserver(notificationCenter,
Unmanaged.passUnretained(self).toOpaque(),
CFNotificationName(name.rawValue as CFString),
nil)
handlers.removeValue(forKey: name)

private class SubscriptionBase {
let name: DarwinNotification
let center: CFNotificationCenter?

init(_ center: CFNotificationCenter?, _ name: DarwinNotification) {
self.name = name
self.center = center
}

static var callback: CFNotificationCallback = { _, observer, _, _, _ in
guard let observer else { return }
Unmanaged<SubscriptionBase>
.fromOpaque(observer)
.takeUnretainedValue()
.notifySubscriber()
}

func notifySubscriber() {
// Overridden by generic subclass to call specific subscriber's
// receive method. This allows forming a C function pointer to the callback.
}
}

private static let observationHandler: CFNotificationCallback = { _, observer, name, _, _ in
guard let observer else { return }
let center = Unmanaged<DarwinNotificationCenter>
.fromOpaque(observer)
.takeUnretainedValue()

guard let rawName = name?.rawValue as String?,
let name = DarwinNotification(rawValue: rawName),
let matchingHandler = center.handlers[name]
else { return }

private class Subscription<S: Subscriber>: SubscriptionBase, Combine.Subscription where S.Input == DarwinNotification, S.Failure == Never {
private var subscriber: S?

init(_ subscriber: S, _ center: CFNotificationCenter?, _ name: DarwinNotification) {
self.subscriber = subscriber
super.init(center, name)
addObserver()
}

func request(_ demand: Subscribers.Demand) {}

private var opaqueSelf: UnsafeRawPointer {
UnsafeRawPointer(Unmanaged.passUnretained(self).toOpaque())
}

matchingHandler()
private func addObserver() {
CFNotificationCenterAddObserver(center,
opaqueSelf,
Self.callback,
name.rawValue as CFString,
nil,
.deliverImmediately)
}

private func removeObserver() {
guard subscriber != nil else { return }
CFNotificationCenterRemoveObserver(center,
opaqueSelf,
CFNotificationName(name.rawValue as CFString),
nil)
subscriber = nil
}

override func notifySubscriber() {
_ = subscriber?.receive(name)
}

func cancel() {
removeObserver()
}

deinit {
removeObserver()
}
}
}
47 changes: 47 additions & 0 deletions Tests/LiveKitTests/DarwinNotificationCenterTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Combine
@testable import LiveKit
import XCTest

class DarwinNotificationCenterTests: XCTestCase {
func testPublisher() throws {
let receiveFirst = XCTestExpectation(description: "Receive from 1st subscriber")
let receiveSecond = XCTestExpectation(description: "Receive from 2nd subscriber")

let name = DarwinNotification.broadcastStarted

var cancellable = Set<AnyCancellable>()
DarwinNotificationCenter.shared
.publisher(for: name)
.sink {
XCTAssertEqual($0, name)
receiveFirst.fulfill()
}
.store(in: &cancellable)
DarwinNotificationCenter.shared
.publisher(for: name)
.sink {
XCTAssertEqual($0, name)
receiveSecond.fulfill()
}
.store(in: &cancellable)

DarwinNotificationCenter.shared.postNotification(name)
wait(for: [receiveFirst, receiveSecond], timeout: 10.0)
}
}

0 comments on commit b9566c5

Please sign in to comment.