Skip to content

Commit

Permalink
Move to original implementation, disconnect on last unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
lawmicha committed Mar 20, 2020
1 parent 281fa33 commit 17c3b81
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,4 @@ extension RealtimeConnectionProvider {
}

}

/// Check if there are any remaining subscription connections on the connection provider
/// If there are no remaining subscription connections, disconnect the underlying websocket
func disconnectIfNoRemainingSubscriptionConnections() {
guard status != .notConnected else {
return
}
serialCallbackQueue.async {[weak self] in
guard let self = self else {
return
}
if self.listeners.count == 0 && self.status == .connected {
AppSyncLogger.info("Realtime connection has no subscription connections, disconnecting.")
self.serialConnectionQueue.async { [weak self] in
guard let self = self else {
return
}
self.status = .notConnected
self.websocket.disconnect()
}

} else {
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + self.unusedConnectionTimeout) { [weak self] in
self?.disconnectIfNoRemainingSubscriptionConnections()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ extension RealtimeConnectionProvider: AppSyncWebsocketDelegate {
AppSyncLogger.debug("WebsocketDidConnect, sending init message...")
sendConnectionInitMessage()
disconnectIfStale()
disconnectIfNoRemainingSubscriptionConnections()
}

public func websocketDidDisconnect(provider: AppSyncWebsocketProvider, error: Error?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public class RealtimeConnectionProvider: ConnectionProvider {
var connectionInterceptors: [ConnectionInterceptor] = []

var staleConnectionTimeout = DispatchTimeInterval.seconds(5 * 60)
var unusedConnectionTimeout = DispatchTimeInterval.seconds(60)
var lastKeepAliveTime = DispatchTime.now()

/// Serial queue for websocket connection.
Expand All @@ -32,13 +31,9 @@ public class RealtimeConnectionProvider: ConnectionProvider {
let serialWriteQueue = DispatchQueue(label: "com.amazonaws.AppSyncRealTimeConnectionProvider.writeQueue")

public init(for url: URL,
websocket: AppSyncWebsocketProvider,
unusedConnectionTimeout: DispatchTimeInterval? = nil) {
websocket: AppSyncWebsocketProvider) {
self.url = url
self.websocket = websocket
if let unusedConnectionTimeout = unusedConnectionTimeout {
self.unusedConnectionTimeout = unusedConnectionTimeout
}
}

// MARK: - ConnectionProvider methods
Expand Down Expand Up @@ -106,7 +101,19 @@ public class RealtimeConnectionProvider: ConnectionProvider {

public func removeListener(identifier: String) {
serialCallbackQueue.async { [weak self] in
self?.listeners.removeValue(forKey: identifier)
guard let self = self else {
return
}

self.listeners.removeValue(forKey: identifier)

self.serialConnectionQueue.async { [weak self] in
guard let self = self else {
return
}
self.status = .notConnected
self.websocket.disconnect()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ public struct ConnectionProviderFactory {

public static func createConnectionProvider(for url: URL,
authInterceptor: AuthInterceptor,
connectionType: SubscriptionConnectionType,
unusedConnectionTimeout: DispatchTimeInterval? = nil) -> ConnectionProvider {
let provider = ConnectionProviderFactory.createConnectionProvider(for: url,
connectionType: connectionType,
unusedConnectionTimeout: unusedConnectionTimeout)
connectionType: SubscriptionConnectionType) -> ConnectionProvider {
let provider = ConnectionProviderFactory.createConnectionProvider(for: url, connectionType: connectionType)

if let messageInterceptable = provider as? MessageInterceptable {
messageInterceptable.addInterceptor(authInterceptor)
Expand All @@ -30,14 +27,11 @@ public struct ConnectionProviderFactory {
}

static func createConnectionProvider(for url: URL,
connectionType: SubscriptionConnectionType,
unusedConnectionTimeout: DispatchTimeInterval? = nil) -> ConnectionProvider {
connectionType: SubscriptionConnectionType) -> ConnectionProvider {
switch connectionType {
case .appSyncRealtime:
let websocketProvider = StarscreamAdapter()
let connectionProvider = RealtimeConnectionProvider(for: url,
websocket: websocketProvider,
unusedConnectionTimeout: unusedConnectionTimeout)
let connectionProvider = RealtimeConnectionProvider(for: url, websocket: websocketProvider)
return connectionProvider
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AppSyncRealTimeClientIntegrationTests: XCTestCase {
/// 1. Create a new connection provider
/// 2. Create multiple subscriptions
/// 3. Unsubscribe the subscriptions
/// 4. Wait for the unusedConnectionTimeout process to disconnect the socket
/// 4. Sleep to make sure the asynchronous process to disconnect the socket is executed
/// 5. Ensure the socket is disconnected
/// 6. Repeat Steps 2-5 with the existing connection provider.
///
Expand All @@ -98,17 +98,13 @@ class AppSyncRealTimeClientIntegrationTests: XCTestCase {
/// - Then:
/// - Underlying websocket is disconnected
func testAllSubscriptionsCancelledShouldDisconnectTheWebsocket() {
let timeoutInSeconds = 5
let unusedConnectionTimeout = DispatchTimeInterval.seconds(timeoutInSeconds)
let waitForTimeout = timeoutInSeconds*3
let connectedInvoked = expectation(description: "Connection established")
connectedInvoked.expectedFulfillmentCount = 3

let authInterceptor = APIKeyAuthInterceptor(apiKey)
let connectionProvider = ConnectionProviderFactory.createConnectionProvider(for: url,
authInterceptor: authInterceptor,
connectionType: .appSyncRealtime,
unusedConnectionTimeout: unusedConnectionTimeout)
connectionType: .appSyncRealtime)
let subscriptionConnection1 = AppSyncSubscriptionConnection(provider: connectionProvider)
let item1 = subscriptionConnection1.subscribe(requestString: requestString, variables: nil) { (event, item) in
if case let .connection(state) = event {
Expand Down Expand Up @@ -149,7 +145,10 @@ class AppSyncRealTimeClientIntegrationTests: XCTestCase {
subscriptionConnection2.unsubscribe(item: item2)
subscriptionConnection3.unsubscribe(item: item3)

sleep(UInt32(waitForTimeout))
// Sleep is required here as disconnecting the connection provider is done asynchronously on the connection
// queue for the very last unsubscribe. This means we need to "pull" for the status to ensure the system is operating correctly by sleeping
// and checking that the status is .notConnected
sleep(5)
XCTAssertEqual(realTimeConnectionProvider.status, .notConnected)

let newConnectedInvoked = expectation(description: "Connection established")
Expand All @@ -164,7 +163,7 @@ class AppSyncRealTimeClientIntegrationTests: XCTestCase {
wait(for: [newConnectedInvoked], timeout: TestCommonConstants.networkTimeout)
XCTAssertEqual(realTimeConnectionProvider.status, .connected)
subscriptionConnection4.unsubscribe(item: newItem)
sleep(UInt32(waitForTimeout))
sleep(5)
XCTAssertEqual(realTimeConnectionProvider.status, .notConnected)
}
}
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# AppSync RealTime Client for iOS

## 1.1.6

### Improvements

- Socket Disconnect when no remaining subscriptions. See [PR #8](https://github.com/aws-amplify/aws-appsync-realtime-client-ios/pull/8)

## 1.1.5

### Bug fix
Expand Down

0 comments on commit 17c3b81

Please sign in to comment.