-
Notifications
You must be signed in to change notification settings - Fork 133
/
DatadogCore.swift
528 lines (458 loc) · 20.4 KB
/
DatadogCore.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2019-Present Datadog, Inc.
*/
import Foundation
import DatadogInternal
/// Core implementation of Datadog SDK.
///
/// The core provides a storage and upload mechanism for each registered Feature
/// based on their respective configuration.
///
/// By complying with `DatadogCoreProtocol`, the core can
/// provide context and writing scopes to Features for event recording.
internal final class DatadogCore {
/// The root location for storing Features data in this instance of the SDK.
/// For each Feature a set of subdirectories is created inside `CoreDirectory` based on their storage configuration.
let directory: CoreDirectory
/// The storage r/w GDC queue.
let readWriteQueue = DispatchQueue(
label: "com.datadoghq.ios-sdk-read-write",
autoreleaseFrequency: .workItem,
target: .global(qos: .utility)
)
/// The system date provider.
let dateProvider: DateProvider
/// The user consent publisher.
let consentPublisher: TrackingConsentPublisher
/// The core SDK performance presets.
let performance: PerformancePreset
/// The HTTP Client for uploads.
let httpClient: HTTPClient
/// The on-disk data encryption.
let encryption: DataEncryption?
/// The user info publisher that publishes value to the
/// `contextProvider`
let userInfoPublisher = UserInfoPublisher()
/// The application version publisher.
let applicationVersionPublisher: ApplicationVersionPublisher
/// The message-bus instance.
let bus = MessageBus()
/// Registry for Features.
@ReadWriteLock
private(set) var stores: [String: (storage: FeatureStorage, upload: FeatureUpload)] = [:]
/// Registry for Features.
@ReadWriteLock
private var features: [String: DatadogFeature] = [:]
/// The core context provider.
internal let contextProvider: DatadogContextProvider
/// Flag defining if background tasks are enabled.
internal let backgroundTasksEnabled: Bool
/// Flag defining if the SDK is run from an extension.
internal let isRunFromExtension: Bool
/// Maximum number of batches per upload.
internal let maxBatchesPerUpload: Int
/// Creates a core instance.
///
/// - Parameters:
/// - directory: The core directory for this instance of the SDK.
/// - dateProvider: The system date provider.
/// - initialConsent: The initial user consent.
/// - performance: The core SDK performance presets.
/// - httpClient: The HTTP Client for uploads.
/// - encryption: The on-disk data encryption.
/// - contextProvider: The core context provider.
/// - applicationVersion: The application version.
init(
directory: CoreDirectory,
dateProvider: DateProvider,
initialConsent: TrackingConsent,
performance: PerformancePreset,
httpClient: HTTPClient,
encryption: DataEncryption?,
contextProvider: DatadogContextProvider,
applicationVersion: String,
maxBatchesPerUpload: Int,
backgroundTasksEnabled: Bool,
isRunFromExtension: Bool = false
) {
self.directory = directory
self.dateProvider = dateProvider
self.performance = performance
self.httpClient = httpClient
self.encryption = encryption
self.contextProvider = contextProvider
self.maxBatchesPerUpload = maxBatchesPerUpload
self.backgroundTasksEnabled = backgroundTasksEnabled
self.isRunFromExtension = isRunFromExtension
self.applicationVersionPublisher = ApplicationVersionPublisher(version: applicationVersion)
self.consentPublisher = TrackingConsentPublisher(consent: initialConsent)
self.contextProvider.subscribe(\.userInfo, to: userInfoPublisher)
self.contextProvider.subscribe(\.version, to: applicationVersionPublisher)
self.contextProvider.subscribe(\.trackingConsent, to: consentPublisher)
// connect the core to the message bus.
// the bus will keep a weak ref to the core.
bus.connect(core: self)
// forward any context change on the message-bus
self.contextProvider.publish { [weak self] context in
self?.send(message: .context(context))
}
}
/// Sets current user information.
///
/// Those will be added to logs, traces and RUM events automatically.
///
/// - Parameters:
/// - id: User ID, if any
/// - name: Name representing the user, if any
/// - email: User's email, if any
/// - extraInfo: User's custom attributes, if any
func setUserInfo(
id: String? = nil,
name: String? = nil,
email: String? = nil,
extraInfo: [AttributeKey: AttributeValue] = [:]
) {
let userInfo = UserInfo(
id: id,
name: name,
email: email,
extraInfo: extraInfo
)
userInfoPublisher.current = userInfo
}
/// Add or override the extra info of the current user
///
/// - Parameters:
/// - extraInfo: The user's custom attibutes to add or override
func addUserExtraInfo(_ newExtraInfo: [AttributeKey: AttributeValue?]) {
var extraInfo = userInfoPublisher.current.extraInfo
newExtraInfo.forEach { extraInfo[$0.key] = $0.value }
userInfoPublisher.current.extraInfo = extraInfo
}
/// Sets the tracking consent regarding the data collection for the Datadog SDK.
///
/// - Parameter trackingConsent: new consent value, which will be applied for all data collected from now on
func set(trackingConsent: TrackingConsent) {
if trackingConsent != consentPublisher.consent {
contextProvider.queue.async { [allStorages] in
// RUM-3175: To prevent race conditions with ongoing "event write" operations,
// data migration must be synchronized on the context queue. This guarantees that
// all latest events have been written before migration occurs.
allStorages.forEach { $0.migrateUnauthorizedData(toConsent: trackingConsent) }
}
consentPublisher.consent = trackingConsent
}
}
/// Clears all data that has not already yet been uploaded Datadog servers.
func clearAllData() {
allStorages.forEach { $0.clearAllData() }
allDataStores.forEach { $0.clearAllData() }
}
/// Adds a message receiver to the bus.
///
/// After being added to the bus, the core will send the current context to receiver.
///
/// - Parameters:
/// - messageReceiver: The new message receiver.
/// - key: The key associated with the receiver.
private func add(messageReceiver: FeatureMessageReceiver, forKey key: String) {
bus.connect(messageReceiver, forKey: key)
contextProvider.read { context in
self.bus.queue.async { messageReceiver.receive(message: .context(context), from: self) }
}
}
/// A list of storage units of currently registered Features.
private var allStorages: [FeatureStorage] {
stores.values.map { $0.storage }
}
/// A list of upload units of currently registered Features.
private var allUploads: [FeatureUpload] {
stores.values.map { $0.upload }
}
private var allDataStores: [DataStore] {
features.values.compactMap { feature in
let featureType = type(of: feature) as DatadogFeature.Type
return scope(for: featureType).dataStore
}
}
/// Awaits completion of all asynchronous operations, forces uploads (without retrying) and deinitializes
/// this instance of the SDK. It **blocks the caller thread**.
///
/// Upon return, it is safe to assume that all events were stored and got uploaded. The SDK was deinitialised so this instance of core is missfunctional.
func flushAndTearDown() {
flush()
// At this point we can assume that all write operations completed and resulted with writing events to
// storage. We now temporarily authorize storage for making all files readable ("uploadable") and perform
// arbitrary uploads (without retrying on failure).
allStorages.forEach { $0.setIgnoreFilesAgeWhenReading(to: true) }
allUploads.forEach { $0.flushAndTearDown() }
allStorages.forEach { $0.setIgnoreFilesAgeWhenReading(to: false) }
stop()
}
/// Stops all processes for this instance of the Datadog core by
/// deallocating all Features and their storage & upload units.
func stop() {
stores = [:]
features = [:]
}
}
extension DatadogCore: DatadogCoreProtocol {
/// Registers a Feature instance.
///
/// A Feature collects and transfers data to a Datadog Product (e.g. Logs, RUM, ...). A registered Feature can
/// open a `FeatureScope` to write events, the core will then be responsible for storing and uploading events
/// in a efficient manner. Performance presets for storage and upload are define when instanciating the core instance.
///
/// A Feature can also communicate to other Features by sending message on the bus that is managed by the core.
///
/// - Parameter feature: The Feature instance.
func register<T>(feature: T) throws where T: DatadogFeature {
if let feature = feature as? DatadogRemoteFeature {
let featureDirectories = try directory.getFeatureDirectories(forFeatureNamed: T.name)
let performancePreset: PerformancePreset
if let override = feature.performanceOverride {
performancePreset = performance.updated(with: override)
} else {
performancePreset = performance
}
let storage = FeatureStorage(
featureName: T.name,
queue: readWriteQueue,
directories: featureDirectories,
dateProvider: dateProvider,
performance: performancePreset,
encryption: encryption,
backgroundTasksEnabled: backgroundTasksEnabled,
telemetry: telemetry
)
let upload = FeatureUpload(
featureName: T.name,
contextProvider: contextProvider,
fileReader: storage.reader,
requestBuilder: feature.requestBuilder,
httpClient: httpClient,
performance: performancePreset,
backgroundTasksEnabled: backgroundTasksEnabled,
maxBatchesPerUpload: maxBatchesPerUpload,
isRunFromExtension: isRunFromExtension,
telemetry: telemetry
)
stores[T.name] = (
storage: storage,
upload: upload
)
// If there is any persisted data recorded with `.pending` consent,
// it should be deleted on Feature startup:
storage.clearUnauthorizedData()
}
features[T.name] = feature
add(messageReceiver: feature.messageReceiver, forKey: T.name)
}
/// Retrieves a Feature by its name and type.
///
/// A Feature type can be specified as parameter or inferred from the return type:
///
/// let feature = core.feature(named: "foo", type: Foo.self)
/// let feature: Foo? = core.feature(named: "foo")
///
/// - Parameters:
/// - name: The Feature's name.
/// - type: The Feature instance type.
/// - Returns: The Feature if any.
func feature<T>(named name: String, type: T.Type) -> T? {
features[name] as? T
}
func scope<Feature>(for featureType: Feature.Type) -> FeatureScope where Feature: DatadogFeature {
return CoreFeatureScope<Feature>(in: self)
}
func set(baggage: @escaping () -> FeatureBaggage?, forKey key: String) {
contextProvider.write { $0.baggages[key] = baggage() }
}
func send(message: FeatureMessage, else fallback: @escaping () -> Void) {
bus.send(message: message, else: fallback)
}
}
internal class CoreFeatureScope<Feature>: @unchecked Sendable, FeatureScope where Feature: DatadogFeature {
private weak var core: DatadogCore?
private let store: FeatureDataStore
init(in core: DatadogCore) {
self.core = core
self.store = FeatureDataStore(
feature: Feature.name,
directory: core.directory,
queue: core.readWriteQueue,
telemetry: core.telemetry
)
}
func eventWriteContext(bypassConsent: Bool, _ block: @escaping (DatadogContext, Writer) -> Void) {
guard let core = core else {
return // core is deinitialized
}
// Capture the storage reference so it is available until async block completion. This is to ensure
// that we write events which were collected on the caller thread even if the core was released in the meantime.
guard let storage = core.stores[Feature.name]?.storage else {
if core.get(feature: Feature.self) != nil { // the feature is running, but has no storage
DD.logger.error(
"Failed to obtain Event Write Context for '\(Feature.name)' because it is not a `DatadogRemoteFeature`."
)
#if DEBUG
assertionFailure("Obtaining Event Write Context for '\(Feature.name)' but it is not a `DatadogRemoteFeature`.")
#endif
}
return
}
// (on user thread) request SDK context
context { context in
// (on context thread) call the block
let writer = storage.writer(for: bypassConsent ? .granted : context.trackingConsent)
block(context, writer)
}
}
func context(_ block: @escaping (DatadogContext) -> Void) {
// (on user thread) request SDK context
core?.contextProvider.read { context in
// (on context thread) call the block
block(context)
}
}
var dataStore: DataStore {
return (core != nil) ? store : NOPDataStore() // only available when the core exists
}
func send(message: FeatureMessage, else fallback: @escaping () -> Void) {
core?.send(message: message, else: fallback)
}
func set(baggage: @escaping () -> FeatureBaggage?, forKey key: String) {
core?.set(baggage: baggage, forKey: key)
}
var telemetry: Telemetry {
return core?.telemetry ?? NOPTelemetry()
}
}
extension DatadogContextProvider {
/// Creates a core context provider with the given configuration,
convenience init(
site: DatadogSite,
clientToken: String,
service: String,
env: String,
version: String,
buildNumber: String,
buildId: String?,
variant: String?,
source: String,
nativeSourceOverride: String?,
sdkVersion: String,
ciAppOrigin: String?,
applicationName: String,
applicationBundleIdentifier: String,
applicationBundleType: BundleType,
applicationVersion: String,
sdkInitDate: Date,
device: DeviceInfo,
processInfo: ProcessInfo,
dateProvider: DateProvider,
serverDateProvider: ServerDateProvider,
notificationCenter: NotificationCenter,
appStateProvider: AppStateProvider
) {
let context = DatadogContext(
site: site,
clientToken: clientToken,
service: service,
env: env,
version: applicationVersion,
buildNumber: buildNumber,
buildId: buildId,
variant: variant,
source: source,
sdkVersion: sdkVersion,
ciAppOrigin: ciAppOrigin,
applicationName: applicationName,
applicationBundleIdentifier: applicationBundleIdentifier,
applicationBundleType: applicationBundleType,
sdkInitDate: dateProvider.now,
device: device,
nativeSourceOverride: nativeSourceOverride,
// this is a placeholder waiting for the `ApplicationStatePublisher`
// to be initialized on the main thread, this value will be overrided
// as soon as the subscription is made.
applicationStateHistory: .active(since: dateProvider.now)
)
self.init(context: context)
subscribe(\.serverTimeOffset, to: ServerOffsetPublisher(provider: serverDateProvider))
#if !os(macOS)
subscribe(\.launchTime, to: LaunchTimePublisher())
#endif
subscribe(\.networkConnectionInfo, to: NWPathMonitorPublisher())
#if os(iOS) && !targetEnvironment(macCatalyst) && !(swift(>=5.9) && os(visionOS))
subscribe(\.carrierInfo, to: CarrierInfoPublisher())
#endif
#if os(iOS) && !targetEnvironment(simulator)
subscribe(\.batteryStatus, to: BatteryStatusPublisher(notificationCenter: notificationCenter, device: .current))
subscribe(\.isLowPowerModeEnabled, to: LowPowerModePublisher(notificationCenter: notificationCenter, processInfo: processInfo))
#endif
#if os(iOS) || os(tvOS)
DispatchQueue.main.async {
// must be call on the main thread to read `UIApplication.State`
let applicationStatePublisher = ApplicationStatePublisher(
appStateProvider: appStateProvider,
notificationCenter: notificationCenter,
dateProvider: dateProvider
)
self.subscribe(\.applicationStateHistory, to: applicationStatePublisher)
}
#endif
}
}
extension DatadogCore: Flushable {
/// Flushes asynchronous operations related to events write, context and message bus propagation in this instance of the SDK
/// with **blocking the caller thread** till their completion.
///
/// Upon return, it is safe to assume that all events are stored. No assumption on their upload should be made - to force events upload
/// use `flushAndTearDown()` instead.
func flush() {
// The order of flushing below must be considered cautiously and
// follow our design choices around SDK core's threading.
// Reset baggages that need not be persisted across flushes.
set(baggage: nil, forKey: LaunchReport.baggageKey)
let features = features.values.compactMap { $0 as? Flushable }
// The flushing is repeated few times, to make sure that operations spawned from other operations
// on these queues are also awaited. Effectively, this is no different than short-time sleep() on current
// thread and it has the same drawbacks (including: it might become flaky). Until we find a better solution
// this is enough to get consistency in tests - but won't be reliable in any public "deinitialize" API.
for _ in 0..<5 {
// First, flush bus queue - because messages can lead to obtaining "event write context" (reading
// context & performing write) in other Features:
bus.flush()
// Next, flush flushable Features - finish current data collection to open "event write contexts":
features.forEach { $0.flush() }
// Next, flush context queue - because it indicates the entry point to "event write context" and
// actual writes dispatched from it:
contextProvider.flush()
// Last, flush read-write queue - it always comes last, no matter if the write operation is dispatched
// from "event write context" started on user thread OR if it happens upon receiving an "event" message
// in other Feature:
readWriteQueue.sync { }
}
}
}
extension DatadogCore: Storage {
/// Returns the most recent modification date of a file in the core directory.
/// - Parameter before: The date to compare the last modification date of files.
/// - Returns: The latest modified file or `nil` if no files were modified before given date.
func mostRecentModifiedFileAt(before: Date) throws -> Date? {
try readWriteQueue.sync {
let file = try directory.coreDirectory.mostRecentModifiedFile(before: before)
return try file?.modifiedAt()
}
}
}
#if SPM_BUILD
import DatadogPrivate
#endif
internal let registerObjcExceptionHandlerOnce: () -> Void = {
ObjcException.rethrow = __dd_private_ObjcExceptionHandler.rethrow
return {}
}()