-
Notifications
You must be signed in to change notification settings - Fork 132
/
Copy pathAWSAppSyncClient.swift
417 lines (368 loc) · 26.6 KB
/
AWSAppSyncClient.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
//
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Licensed under the Amazon Software License
// http://aws.amazon.com/asl/
//
import Foundation
import AWSCore
public typealias SubscriptionResultHandler<Operation: GraphQLSubscription> = (_ result: GraphQLResult<Operation.Data>?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void
public typealias SubscriptionStatusChangeHandler = (AWSAppSyncSubscriptionWatcherStatus) -> Void
public typealias DeltaQueryResultHandler<Operation: GraphQLQuery> = (_ result: GraphQLResult<Operation.Data>?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void
public typealias OptimisticResponseBlock = (ApolloStore.ReadWriteTransaction?) -> Void
public typealias MutationConflictHandler<Mutation: GraphQLMutation> = (_ serverState: Snapshot?, _ taskCompletionSource: AWSTaskCompletionSource<Mutation>?, _ resultHandler: OperationResultHandler<Mutation>?) -> Void
internal let NoOpOperationString = "No-op"
/// Delegates will be notified when a mutation is performed from the `mutationCallback`. This pattern is necessary
/// in order to provide notifications of mutations which are performed after an app restart and the initial callback
/// context has been lost.
public protocol AWSAppSyncOfflineMutationDelegate {
func mutationCallback(recordIdentifier: String, operationString: String, snapshot: Snapshot?, error: Error?)
}
/// The client for making `Mutation`, `Query` and `Subscription` requests.
public class AWSAppSyncClient {
static var prefixTracker: [String: (String, Int)] = [:]
static var prefixTrackerQueue: DispatchQueue = DispatchQueue(label: "com.amazonaws.appsync.AWSAppSyncClient.clientDatabasePrefixTrackerQueue")
public let apolloClient: ApolloClient?
public let store: ApolloStore?
public let presignedURLClient: AWSS3ObjectPresignedURLGenerator?
public let s3ObjectManager: AWSS3ObjectManager?
var httpTransport: AWSNetworkTransport?
var subscriptionConnectionFactory: SubscriptionConnectionFactory?
public var offlineMutationDelegate: AWSAppSyncOfflineMutationDelegate?
private var mutationQueue: AWSPerformMutationQueue!
var retryStrategy: AWSAppSyncRetryStrategy
var prefixTrackerKey: String?
var prefixTrackerValue: String?
/// The count of Mutation operations queued for sending to the backend.
///
/// AppSyncClient processes both offline and online mutations, and mutations are queued for processing even while
/// the client is offline, so this count represents a good measure of the number of mutations that have yet to be
/// successfully sent to the service, regardless of the state of the network.
///
/// This value is `nil` if the mutationQueue cannot be accessed (e.g., has not finished initializing).
public var queuedMutationCount: Int? {
return mutationQueue?.operationQueueCount
}
private var connectionStateChangeHandler: ConnectionStateChangeHandler?
private var autoSubmitOfflineMutations: Bool = false
private var subscriptionsQueue = DispatchQueue(label: "SubscriptionsQueue", qos: .userInitiated)
fileprivate var subscriptionMetadataCache: AWSSubscriptionMetaDataCache?
/// Creates a client with the specified `AWSAppSyncClientConfiguration`.
///
/// - Parameters:
/// - appSyncConfig: The `AWSAppSyncClientConfiguration` object.
public convenience init(appSyncConfig: AWSAppSyncClientConfiguration) throws {
try self.init(appSyncConfig: appSyncConfig, reachabilityFactory: nil)
}
/// Creates a client with the specified `AWSAppSyncClientConfiguration`
/// and `NetworkReachabilityProvidingFactory`.
///
/// This method is primarily intended to facilitate integration testing, but may be used by apps that wish
/// to use their own reachability solution in place of AppSync's bundled reachability framework.
///
/// Note that the AppSync client's use of the Reachability client provided by the factory should be
/// considered an implementation detail. In particular, apps should not rely on AppSync to inspect network
/// reachability status before attempting a network connection.
///
/// - Parameters:
/// - appSyncConfig: The `AWSAppSyncClientConfiguration` object.
/// - reachabilityFactory: An optional factory that provides `NetworkReachabilityProviding` instances.
public init(appSyncConfig: AWSAppSyncClientConfiguration,
reachabilityFactory: NetworkReachabilityProvidingFactory.Type? = nil) throws {
AppSyncLog.info("Initializing AppSyncClient")
self.autoSubmitOfflineMutations = appSyncConfig.autoSubmitOfflineMutations
self.store = appSyncConfig.store
self.presignedURLClient = appSyncConfig.presignedURLClient
self.s3ObjectManager = appSyncConfig.s3ObjectManager
self.subscriptionMetadataCache = appSyncConfig.subscriptionMetadataCache
self.httpTransport = appSyncConfig.networkTransport
self.connectionStateChangeHandler = appSyncConfig.connectionStateChangeHandler
self.retryStrategy = appSyncConfig.retryStrategy
self.apolloClient = ApolloClient(networkTransport: self.httpTransport!, store: appSyncConfig.store)
self.subscriptionConnectionFactory = appSyncConfig.subscriptionConnectionFactory
NetworkReachabilityNotifier.setupShared(
host: appSyncConfig.url.host!,
allowsCellularAccess: appSyncConfig.allowsCellularAccess,
reachabilityFactory: reachabilityFactory)
self.mutationQueue = AWSPerformMutationQueue(
appSyncClient: self,
networkClient: httpTransport!,
reachabiltyChangeNotifier: NetworkReachabilityNotifier.shared,
cacheFileURL: appSyncConfig.cacheConfiguration?.offlineMutations)
NotificationCenter.default.addObserver(
self,
selector: #selector(appsyncReachabilityChanged(note:)),
name: .appSyncReachabilityChanged,
object: nil)
try AWSAppSyncClient.prefixTrackerQueue.sync {
if appSyncConfig.cacheConfiguration?.usePrefix ?? false {
let prefixTrackerKey = appSyncConfig.cacheConfiguration?.prefix ?? ""
let authTypeString = appSyncConfig.authType?.rawValue ?? "unknown_auth"
let prefixTrackerValue = appSyncConfig.url.absoluteString + "_" + authTypeString
if let (clientString, clientCount) = AWSAppSyncClient.prefixTracker[prefixTrackerKey] {
if clientString != prefixTrackerValue {
throw AWSAppSyncClientConfigurationError.cacheConfigurationAlreadyInUse("Configured two clients with the same database prefix")
} else {
AWSAppSyncClient.prefixTracker[prefixTrackerKey] = (prefixTrackerValue, clientCount + 1)
}
} else {
AWSAppSyncClient.prefixTracker[prefixTrackerKey] = (prefixTrackerValue, 1)
}
self.prefixTrackerKey = prefixTrackerKey
self.prefixTrackerValue = prefixTrackerValue
}
}
}
deinit {
AppSyncLog.info("Releasing AppSyncClient")
NetworkReachabilityNotifier.clearShared()
AWSAppSyncClient.prefixTrackerQueue.sync {
if let key = self.prefixTrackerKey,
let (value, count) = AWSAppSyncClient.prefixTracker[key] {
if count <= 1 {
AWSAppSyncClient.prefixTracker[key] = nil
} else {
AWSAppSyncClient.prefixTracker[key] = (value, count - 1)
}
}
}
}
@objc func appsyncReachabilityChanged(note: Notification) {
let connectionInfo = note.object as! AppSyncConnectionInfo
let isReachable = connectionInfo.isConnectionAvailable
let accessState = isReachable ? ClientNetworkAccessState.Online : .Offline
self.connectionStateChangeHandler?.stateChanged(networkState: accessState)
}
/// Clears apollo cache
///
/// - Returns: Promise
@available(*, deprecated, message: "Use the clearCaches method that optionally takes in ClearCacheOptions")
public func clearCache() -> Promise<Void> {
guard let store = store else { return Promise(fulfilled: ()) }
return store.clearCache()
}
/// Clears the apollo cache, offline mutation queue, and delta sync subscription metadata
///
/// - Parameters:
/// - options Fine-tune which caches are cleared when calling this method
public func clearCaches(options: ClearCacheOptions = ClearCacheOptions(clearQueries: true, clearMutations: true, clearSubscriptions: true)) throws {
var map: [CacheType: Error] = [:]
do {
if options.clearQueries {
try store?.clearCache().await()
}
} catch {
map[.query] = error
}
do {
if options.clearMutations {
try mutationQueue.clearQueue()
}
} catch {
map[.mutation] = error
}
do {
if options.clearSubscriptions {
try subscriptionMetadataCache?.clear()
}
} catch {
map[.subscription] = error
}
if map.keys.count > 0 {
throw ClearCacheError.failedToClear(map)
}
}
/// Fetches a query from the server or from the local cache, depending on the current contents of the cache and the
/// specified cache policy.
///
/// - Parameters:
/// - query: The query to fetch.
/// - cachePolicy: A cache policy that specifies when results should be fetched from the server and when data should be loaded from the local cache.
/// - queue: A dispatch queue on which the result handler will be called. Defaults to the main queue.
/// - resultHandler: An optional closure that is called when query results are available or when an error occurs.
/// - result: The result of the fetched query, or `nil` if an error occurred.
/// - error: An error that indicates why the fetch failed, or `nil` if the fetch was succesful.
/// - Returns: An object that can be used to cancel an in progress fetch.
@discardableResult public func fetch<Query: GraphQLQuery>(query: Query, cachePolicy: CachePolicy = .returnCacheDataElseFetch, queue: DispatchQueue = DispatchQueue.main, resultHandler: OperationResultHandler<Query>? = nil) -> Cancellable {
AppSyncLog.verbose("Fetching: \(query)")
return apolloClient!.fetch(query: query, cachePolicy: cachePolicy, queue: queue, resultHandler: resultHandler)
}
/// Watches a query by first fetching an initial result from the server or from the local cache, depending on the current contents of the cache and the specified cache policy. After the initial fetch, the returned query watcher object will get notified whenever any of the data the query result depends on changes in the local cache, and calls the result handler again with the new result.
///
/// - Parameters:
/// - query: The query to fetch.
/// - cachePolicy: A cache policy that specifies when results should be fetched from the server or from the local cache.
/// - queue: A dispatch queue on which the result handler will be called. Defaults to the main queue.
/// - resultHandler: An optional closure that is called when query results are available or when an error occurs.
/// - result: The result of the fetched query, or `nil` if an error occurred.
/// - error: An error that indicates why the fetch failed, or `nil` if the fetch was succesful.
/// - Returns: A query watcher object that can be used to control the watching behavior.
public func watch<Query: GraphQLQuery>(query: Query, cachePolicy: CachePolicy = .returnCacheDataElseFetch, queue: DispatchQueue = DispatchQueue.main, resultHandler: @escaping OperationResultHandler<Query>) -> GraphQLQueryWatcher<Query> {
return apolloClient!.watch(query: query, cachePolicy: cachePolicy, queue: queue, resultHandler: resultHandler)
}
public func subscribe<Subscription: GraphQLSubscription>(subscription: Subscription,
queue: DispatchQueue = DispatchQueue.main,
statusChangeHandler: SubscriptionStatusChangeHandler? = nil,
resultHandler: @escaping SubscriptionResultHandler<Subscription>) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? {
let connection = self.subscriptionConnectionFactory?.connection(connectionType: .appSyncRealtime)
return AWSAppSyncSubscriptionWatcher(connection: connection!,
store: self.store!,
subscriptionsQueue: self.subscriptionsQueue,
subscription: subscription,
handlerQueue: queue,
statusChangeHandler: statusChangeHandler,
resultHandler: resultHandler)
}
internal func subscribeWithConnectCallback<Subscription: GraphQLSubscription>(subscription: Subscription,
queue: DispatchQueue = DispatchQueue.main,
connectCallback: @escaping (() -> Void),
resultHandler: @escaping SubscriptionResultHandler<Subscription>) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? {
let connection = self.subscriptionConnectionFactory?.connection(connectionType: .appSyncRealtime)
return AWSAppSyncSubscriptionWatcher(connection: connection!,
store: self.store!,
subscriptionsQueue: self.subscriptionsQueue,
subscription: subscription,
handlerQueue: queue,
statusChangeHandler: nil,
connectedCallback: connectCallback,
resultHandler: resultHandler)
}
/// Performs a mutation by sending it to the server. Internally, these mutations are added to a queue and performed
/// serially, in first-in, first-out order. Clients can inspect the size of the queue with the `queuedMutationCount`
/// property.
///
/// - Parameters:
/// - mutation: The mutation to perform.
/// - queue: A dispatch queue on which the result handler will be called. Defaults to the main queue.
/// - optimisticUpdate: An optional closure which gets executed before making the network call, should be used to update local store using the `transaction` object.
/// - conflictResolutionBlock: An optional closure that is called when mutation results into a conflict.
/// - resultHandler: An optional closure that is called when mutation results are available or when an error occurs.
/// - result: The result of the performed mutation, or `nil` if an error occurred.
/// - error: An error that indicates why the mutation failed, or `nil` if the mutation was succesful.
/// - Returns: An object that can be used to cancel an in progress mutation.
@discardableResult
public func perform<Mutation: GraphQLMutation>(
mutation: Mutation,
queue: DispatchQueue = .main,
optimisticUpdate: OptimisticResponseBlock? = nil,
conflictResolutionBlock: MutationConflictHandler<Mutation>? = nil,
resultHandler: OperationResultHandler<Mutation>? = nil) -> Cancellable {
if let optimisticUpdate = optimisticUpdate {
do {
_ = try store?.withinReadWriteTransaction { transaction in
optimisticUpdate(transaction)
}.await()
} catch {
AppSyncLog.error("optimisticUpdate error: \(error)")
}
}
return mutationQueue.add(
mutation,
mutationConflictHandler: conflictResolutionBlock,
mutationResultHandler: resultHandler,
handlerQueue: queue
)
}
internal final class EmptySubscription: GraphQLSubscription {
public static var operationString: String = NoOpOperationString
struct Data: GraphQLSelectionSet {
static var selections: [GraphQLSelection] = []
var snapshot: Snapshot = [:]
}
}
internal final class EmptyQuery: GraphQLQuery {
public static var operationString: String = NoOpOperationString
struct Data: GraphQLSelectionSet {
static var selections: [GraphQLSelection] = []
var snapshot: Snapshot = [:]
}
}
/// Performs a sync operation where a base query is periodically called to fetch primary data from the server based on the syncConfiguration.
///
/// - Parameters:
/// - baseQuery: The base query to fetch which contains the primary data.
/// - baseQueryResultHandler: Closure that is called when base query results are available or when an error occurs. Every time a sync operation is called, a fetch for the baseQuery from the cache will be done first before initiating any other operations.
/// - subscription: The subscription which will provide real time updates.
/// - subscriptionResultHandler: Closure that is called when a real time update is available or when an error occurs.
/// - deltaQuery: The delta query which fetches data starting from the `lastSync` time.
/// - deltaQueryResultHandler: Closure that is called when delta query executes.
/// - callbackQueue: An optional queue on which sync callbacks will be invoked. Defaults to the main queue.
/// - syncConfiguration: The sync configuration where the baseQuery sync interval can be specified. (Defaults to 24 hours.)
/// - Returns: An object that can be used to cancel the sync operation.
public func sync<BaseQuery: GraphQLQuery, Subscription: GraphQLSubscription, DeltaQuery: GraphQLQuery>(baseQuery: BaseQuery,
baseQueryResultHandler: @escaping OperationResultHandler<BaseQuery>,
subscription: Subscription,
subscriptionResultHandler: @escaping SubscriptionResultHandler<Subscription>,
deltaQuery: DeltaQuery,
deltaQueryResultHandler: @escaping DeltaQueryResultHandler<DeltaQuery>,
callbackQueue: DispatchQueue = DispatchQueue.main,
syncConfiguration: SyncConfiguration = SyncConfiguration()) -> Cancellable {
return AppSyncSubscriptionWithSync<Subscription, BaseQuery, DeltaQuery>(appSyncClient: self,
baseQuery: baseQuery,
deltaQuery: deltaQuery,
subscription: subscription,
baseQueryHandler: baseQueryResultHandler,
deltaQueryHandler: deltaQueryResultHandler,
subscriptionResultHandler: subscriptionResultHandler,
subscriptionMetadataCache: self.subscriptionMetadataCache,
syncConfiguration: syncConfiguration,
handlerQueue: callbackQueue)
}
/// Performs a sync operation where a base query is periodically called to fetch primary data from the server based on the syncConfiguration.
///
/// - Parameters:
/// - baseQuery: The base query to fetch which contains the primary data.
/// - baseQueryResultHandler: Closure that is called when base query results are available or when an error occurs. Every time a sync operation is called, a fetch for the baseQuery from the cache will be done first before initiating any other operations.
/// - deltaQuery: The delta query which fetches data starting from the `lastSync` time.
/// - deltaQueryResultHandler: Closure that is called when delta query executes.
/// - callbackQueue: An optional queue on which sync callbacks will be invoked. Defaults to the main queue.
/// - syncConfiguration: The sync configuration where the baseQuery sync interval can be specified. (Defaults to 24 hours.)
/// - Returns: An object that can be used to cancel the sync operation.
public func sync<BaseQuery: GraphQLQuery, DeltaQuery: GraphQLQuery>(baseQuery: BaseQuery,
baseQueryResultHandler: @escaping OperationResultHandler<BaseQuery>,
deltaQuery: DeltaQuery,
deltaQueryResultHandler: @escaping DeltaQueryResultHandler<DeltaQuery>,
callbackQueue: DispatchQueue = DispatchQueue.main,
syncConfiguration: SyncConfiguration = SyncConfiguration()) -> Cancellable {
// The compiler chokes on delegating to `AWSAppSyncClient.sync(baseQuery:baseQueryResultHandler:..)`, so we'll invoke
// the final return within this method, at the expense of some code duplication.
let subscription = EmptySubscription.init()
let subscriptionResultHandler: SubscriptionResultHandler<EmptySubscription> = { (_, _, _) in }
return AppSyncSubscriptionWithSync<EmptySubscription, BaseQuery, DeltaQuery>(appSyncClient: self,
baseQuery: baseQuery,
deltaQuery: deltaQuery,
subscription: subscription,
baseQueryHandler: baseQueryResultHandler,
deltaQueryHandler: deltaQueryResultHandler,
subscriptionResultHandler: subscriptionResultHandler,
subscriptionMetadataCache: self.subscriptionMetadataCache,
syncConfiguration: syncConfiguration,
handlerQueue: callbackQueue)
}
/// Performs a sync operation where a base query is periodically called to fetch primary data from the server based on the syncConfiguration.
///
/// - Parameters:
/// - baseQuery: The base query to fetch which contains the primary data.
/// - baseQueryResultHandler: Closure that is called when base query results are available or when an error occurs. Every time a sync operation is called, a fetch for the baseQuery from the cache will be done first before initiating any other operations.
/// - callbackQueue: An optional queue on which sync callbacks will be invoked. Defaults to the main queue.
/// - syncConfiguration: The sync configuration where the baseQuery sync interval can be specified. (Defaults to 24 hours.)
/// - Returns: An object that can be used to cancel the sync operation.
public func sync<BaseQuery: GraphQLQuery>(baseQuery: BaseQuery,
baseQueryResultHandler: @escaping OperationResultHandler<BaseQuery>,
callbackQueue: DispatchQueue = DispatchQueue.main,
syncConfiguration: SyncConfiguration = SyncConfiguration()) -> Cancellable {
let subs = EmptySubscription.init()
let subsCallback: (GraphQLResult<EmptySubscription.Data>?, ApolloStore.ReadTransaction?, Error?) -> Void = { (_, _, _) in }
let deltaQuery = EmptyQuery.init()
let deltaCallback: (GraphQLResult<EmptyQuery.Data>?, ApolloStore.ReadTransaction?, Error?) -> Void = { (_, _, _) in }
return AppSyncSubscriptionWithSync<EmptySubscription, BaseQuery, EmptyQuery>.init(appSyncClient: self,
baseQuery: baseQuery,
deltaQuery: deltaQuery,
subscription: subs,
baseQueryHandler: baseQueryResultHandler,
deltaQueryHandler: deltaCallback,
subscriptionResultHandler: subsCallback,
subscriptionMetadataCache: self.subscriptionMetadataCache,
syncConfiguration: syncConfiguration,
handlerQueue: callbackQueue)
}
}