Skip to content

Commit

Permalink
OpRepo: Use private dispatch queue instead of lock
Browse files Browse the repository at this point in the history
* In a previous commit, an unfair lock was used to synchronize access to the delta queue and synchronize flushing behavior
* A dispatch queue seems more appropriate for the Operation Repo to use considering it already polls and flushes on a global queue.
* Without the lock or dispatch queue, I reproduced crashes by creating multiple async threads that either added tags or called to flush the operation repo.
* With the dispatch queue, those crashes do no happen and behavior seems as expected.
  • Loading branch information
nan-li committed Mar 4, 2024
1 parent cb88aa6 commit 981aba6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
71 changes: 38 additions & 33 deletions iOS_SDK/OneSignalSDK/OneSignalOSCore/Source/OSOperationRepo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import OneSignalCore
public class OSOperationRepo: NSObject {
public static let sharedInstance = OSOperationRepo()
private var hasCalledStart = false
private let deltaQueueLock = UnfairLock()

// The Operation Repo dispatch queue, serial. This synchronizes access to `deltaQueue` and flushing behavior.
private let dispatchQueue = DispatchQueue(label: "OneSignal.OSOperationRepo", target: .global())

// Maps delta names to the interfaces for the operation executors
var deltasToExecutorMap: [String: OSOperationExecutor] = [:]
Expand Down Expand Up @@ -63,7 +65,7 @@ public class OSOperationRepo: NSObject {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSOperationRepo calling start()")
// register as user observer
NotificationCenter.default.addObserver(self,
selector: #selector(self.flushDeltaQueue),
selector: #selector(self.addFlushDeltaQueueToDispatchQueue),
name: Notification.Name(OS_ON_USER_WILL_CHANGE),
object: nil)
// Read the Deltas from cache, if any...
Expand All @@ -77,7 +79,7 @@ public class OSOperationRepo: NSObject {
}

private func pollFlushQueue() {
DispatchQueue.global().asyncAfter(deadline: .now() + .milliseconds(pollIntervalMilliseconds)) { [weak self] in
self.dispatchQueue.asyncAfter(deadline: .now() + .milliseconds(pollIntervalMilliseconds)) { [weak self] in
self?.flushDeltaQueue()
self?.pollFlushQueue()
}
Expand All @@ -102,16 +104,21 @@ public class OSOperationRepo: NSObject {
return
}
start()
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSOperationRepo enqueueDelta: \(delta)")

deltaQueueLock.locked {
deltaQueue.append(delta)
self.dispatchQueue.async {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSOperationRepo enqueueDelta: \(delta)")
self.deltaQueue.append(delta)
// Persist the deltas (including new delta) to storage
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_OPERATION_REPO_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
}
}

@objc public func flushDeltaQueue(inBackground: Bool = false) {
@objc public func addFlushDeltaQueueToDispatchQueue(inBackground: Bool = false) {
self.dispatchQueue.async {
self.flushDeltaQueue(inBackground: inBackground)
}
}

private func flushDeltaQueue(inBackground: Bool = false) {
guard !paused else {
OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSOperationRepo not flushing queue due to being paused")
return
Expand All @@ -125,38 +132,36 @@ public class OSOperationRepo: NSObject {
OSBackgroundTaskManager.beginBackgroundTask(OPERATION_REPO_BACKGROUND_TASK)
}

start()
self.start()

deltaQueueLock.locked {
if !deltaQueue.isEmpty {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSOperationRepo flushDeltaQueue in background: \(inBackground) with queue: \(deltaQueue)")
}
if !self.deltaQueue.isEmpty {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSOperationRepo flushDeltaQueue in background: \(inBackground) with queue: \(self.deltaQueue)")
}

var index = 0
for delta in deltaQueue {
if let executor = deltasToExecutorMap[delta.name] {
executor.enqueueDelta(delta)
deltaQueue.remove(at: index)
} else {
// keep in queue if no executor matches, we may not have the executor available yet
index += 1
}
var index = 0
for delta in self.deltaQueue {
if let executor = self.deltasToExecutorMap[delta.name] {
executor.enqueueDelta(delta)
self.deltaQueue.remove(at: index)
} else {
// keep in queue if no executor matches, we may not have the executor available yet
index += 1
}
}

// Persist the deltas (including removed deltas) to storage after they are divvy'd up to executors.
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_OPERATION_REPO_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
// Persist the deltas (including removed deltas) to storage after they are divvy'd up to executors.
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_OPERATION_REPO_DELTA_QUEUE_KEY, withValue: self.deltaQueue)

for executor in executors {
executor.cacheDeltaQueue()
}
for executor in self.executors {
executor.cacheDeltaQueue()
}

for executor in executors {
executor.processDeltaQueue(inBackground: inBackground)
}
for executor in self.executors {
executor.processDeltaQueue(inBackground: inBackground)
}

if inBackground {
OSBackgroundTaskManager.endBackgroundTask(OPERATION_REPO_BACKGROUND_TASK)
}
if inBackground {
OSBackgroundTaskManager.endBackgroundTask(OPERATION_REPO_BACKGROUND_TASK)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ final class OneSignalUserTests: XCTestCase {
for _ in 1...4 {
DispatchQueue.global().async {
print("🧪 flushDeltaQueue on thread \(Thread.current)")
OSOperationRepo.sharedInstance.flushDeltaQueue()
OSOperationRepo.sharedInstance.addFlushDeltaQueueToDispatchQueue()
}
}

Expand Down

0 comments on commit 981aba6

Please sign in to comment.