From 0bd14d84f14b1b3730afdbe142c1cc73f8450a47 Mon Sep 17 00:00:00 2001 From: Christopher Vanderschuere Date: Sun, 25 Mar 2018 11:38:53 -0700 Subject: [PATCH 1/8] Implement connectivity state property and observers (#8) --- Sources/CgRPC/shim/cgrpc.h | 6 + Sources/CgRPC/shim/channel.c | 5 + Sources/CgRPC/shim/completion_queue.c | 4 + Sources/SwiftGRPC/Core/Channel.swift | 148 +++++++++++++++++- .../SwiftGRPC/Core/ConnectivityState.swift | 32 ---- 5 files changed, 159 insertions(+), 36 deletions(-) delete mode 100644 Sources/SwiftGRPC/Core/ConnectivityState.swift diff --git a/Sources/CgRPC/shim/cgrpc.h b/Sources/CgRPC/shim/cgrpc.h index 87a82670e..6f1929d08 100644 --- a/Sources/CgRPC/shim/cgrpc.h +++ b/Sources/CgRPC/shim/cgrpc.h @@ -147,6 +147,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel); grpc_connectivity_state cgrpc_channel_check_connectivity_state( cgrpc_channel *channel, int try_to_connect); +void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, + cgrpc_completion_queue * completion_queue, + grpc_connectivity_state last_observed_state, + double deadline, + void *tag); // server support cgrpc_server *cgrpc_server_create(const char *address); @@ -159,6 +164,7 @@ void cgrpc_server_start(cgrpc_server *s); cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s); // completion queues +cgrpc_completion_queue *cgrpc_completion_queue_create_for_next(); grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq, double timeout); void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq); diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index 3ddcba0c2..1d4cb34dc 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -103,3 +103,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) { grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) { return grpc_channel_check_connectivity_state(channel->channel, try_to_connect); } + +void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) { + gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline); + return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag); +} diff --git a/Sources/CgRPC/shim/completion_queue.c b/Sources/CgRPC/shim/completion_queue.c index 726f9a4ed..7c5fa1351 100644 --- a/Sources/CgRPC/shim/completion_queue.c +++ b/Sources/CgRPC/shim/completion_queue.c @@ -18,6 +18,10 @@ #include +grpc_completion_queue *cgrpc_completion_queue_create_for_next() { + return grpc_completion_queue_create_for_next(NULL); +} + grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) { gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout); if (timeout < 0) { diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index f0ca4c61f..f04a9b0e5 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -31,10 +31,9 @@ public class Channel { /// Default host to use for new calls public var host: String - - public var connectivityState: ConnectivityState? { - return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0)) - } + + /// Connectivity state observers + private var observers: [ConnectivityObserver] = [] /// Initializes a gRPC channel /// @@ -81,4 +80,145 @@ public class Channel { let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)! return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue) } + + public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState { + return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) + } + + public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) { + var observer = observers.first(where: { $0.state == sourceState }) + + if observer == nil { + let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect) + observers.append(newObserver) + observer = newObserver + } + + observer?.callbacks.append(callback) + observer?.polling = true + } +} + +private extension Channel { + class ConnectivityObserver: Equatable { + let state: ConnectivityState + let queue: CompletionQueue + let underlyingChannel: UnsafeMutableRawPointer + let underlyingCompletionQueue: UnsafeMutableRawPointer + private(set) var tryToConnect: Bool + var callbacks: [(ConnectivityState) -> ()] = [] + private var lastState: ConnectivityState + + var polling: Bool = false { + didSet { + if polling == true && oldValue == false { + run() + } + } + } + + init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) { + self.state = state + self.underlyingChannel = underlyingChannel + self.tryToConnect = tryToConnect + self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next() + self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") + self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0)) + } + + deinit { + queue.shutdown() + } + + private func run() { + DispatchQueue.global().async { [weak self] in + guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return } + + while self.polling { + guard !self.callbacks.isEmpty && !self.tryToConnect else { + self.polling = false + break + } + + defer { self.tryToConnect = false } + + let deadline: TimeInterval = 0.2 + cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil) + let event = self.queue.wait(timeout: deadline) + + if event.success == 1 || self.tryToConnect { + let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0)) + + guard newState != self.lastState else { continue } + defer { self.lastState = newState } + + if self.lastState == self.state { + self.callbacks.forEach({ $0(newState) }) + } + } + } + } + } + + static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool { + return lhs.state == rhs.state + } + } +} + +extension Channel { + public enum ConnectivityState { + /// Channel has just been initialized + case initialized + /// Channel is idle + case idle + /// Channel is connecting + case connecting + /// Channel is ready for work + case ready + /// Channel has seen a failure but expects to recover + case transientFailure + /// Channel has seen a failure that it cannot recover from + case shutdown + /// Channel connectivity state is unknown + case unknown + + fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState { + switch value { + case GRPC_CHANNEL_INIT: + return .initialized + case GRPC_CHANNEL_IDLE: + return .idle + case GRPC_CHANNEL_CONNECTING: + return .connecting + case GRPC_CHANNEL_READY: + return .ready + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return .transientFailure + case GRPC_CHANNEL_SHUTDOWN: + return .shutdown + default: + return .unknown + } + } + + fileprivate var underlyingState: grpc_connectivity_state? { + switch self { + case .initialized: + return GRPC_CHANNEL_INIT + case .idle: + return GRPC_CHANNEL_IDLE + case .connecting: + return GRPC_CHANNEL_CONNECTING + case .ready: + return GRPC_CHANNEL_READY + case .transientFailure: + return GRPC_CHANNEL_TRANSIENT_FAILURE + case .shutdown: + return GRPC_CHANNEL_SHUTDOWN + default: + return nil + } + } + } } diff --git a/Sources/SwiftGRPC/Core/ConnectivityState.swift b/Sources/SwiftGRPC/Core/ConnectivityState.swift deleted file mode 100644 index 58e6c01ca..000000000 --- a/Sources/SwiftGRPC/Core/ConnectivityState.swift +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2018, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#if SWIFT_PACKAGE - import CgRPC -#endif -import Foundation - -public enum ConnectivityState: Int32, Error { - case initializing = -1 - case idle - case connecting - case ready - case transient_failure - case shutdown - - static func fromCEnum(_ connectivityState: grpc_connectivity_state) -> ConnectivityState? { - return ConnectivityState(rawValue: connectivityState.rawValue) - } -} From 76faf092af15300562595a36d6f15de096e6c35e Mon Sep 17 00:00:00 2001 From: Chris Vanderschuere Date: Mon, 26 Mar 2018 21:38:27 -0700 Subject: [PATCH 2/8] Explicitly import dispatch --- Sources/SwiftGRPC/Core/Channel.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index f04a9b0e5..301ae5973 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -15,6 +15,7 @@ */ #if SWIFT_PACKAGE import CgRPC + import Dispatch #endif import Foundation From 255ce888d03ad1aa1627352590e5b3c8dd678bb6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiebaud Date: Mon, 21 May 2018 14:11:21 -0700 Subject: [PATCH 3/8] PR changes --- Sources/CgRPC/shim/cgrpc.h | 2 +- Sources/CgRPC/shim/channel.c | 2 +- Sources/SwiftGRPC/Core/Channel.swift | 91 +++++++++++++--------------- 3 files changed, 44 insertions(+), 51 deletions(-) diff --git a/Sources/CgRPC/shim/cgrpc.h b/Sources/CgRPC/shim/cgrpc.h index 6f1929d08..647cbb547 100644 --- a/Sources/CgRPC/shim/cgrpc.h +++ b/Sources/CgRPC/shim/cgrpc.h @@ -148,7 +148,7 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel); grpc_connectivity_state cgrpc_channel_check_connectivity_state( cgrpc_channel *channel, int try_to_connect); void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, - cgrpc_completion_queue * completion_queue, + cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag); diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index 1d4cb34dc..306caa2d2 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -104,7 +104,7 @@ grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *ch return grpc_channel_check_connectivity_state(channel->channel, try_to_connect); } -void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) { +void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) { gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline); return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag); } diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index 301ae5973..6558102e1 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -34,7 +34,7 @@ public class Channel { public var host: String /// Connectivity state observers - private var observers: [ConnectivityObserver] = [] + private var connectivityObservers: [ConnectivityObserver] = [] /// Initializes a gRPC channel /// @@ -47,8 +47,7 @@ public class Channel { } else { underlyingChannel = cgrpc_channel_create(address) } - completionQueue = CompletionQueue( - underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") + completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") completionQueue.run() // start a loop that watches the channel's completion queue } @@ -59,13 +58,13 @@ public class Channel { /// - Parameter host: an optional hostname override public init(address: String, certificates: String, host: String?) { self.host = address - underlyingChannel = cgrpc_channel_create_secure(address, certificates, host) - completionQueue = CompletionQueue( - underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") + underlyingChannel = cgrpc_channel_create_secure(address, certificates, &argumentValues, Int32(arguments.count)) + completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") completionQueue.run() // start a loop that watches the channel's completion queue } deinit { + connectivityObservers.forEach { $0.polling = false } cgrpc_channel_destroy(underlyingChannel) completionQueue.shutdown() } @@ -86,83 +85,77 @@ public class Channel { return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) } - public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) { - var observer = observers.first(where: { $0.state == sourceState }) - - if observer == nil { - let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect) - observers.append(newObserver) - observer = newObserver - } - - observer?.callbacks.append(callback) - observer?.polling = true + public func subscribe(callback: @escaping (ConnectivityState) -> ()) { + let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback) + observer.polling = true + connectivityObservers.append(observer) } } private extension Channel { - class ConnectivityObserver: Equatable { - let state: ConnectivityState - let queue: CompletionQueue - let underlyingChannel: UnsafeMutableRawPointer - let underlyingCompletionQueue: UnsafeMutableRawPointer - private(set) var tryToConnect: Bool - var callbacks: [(ConnectivityState) -> ()] = [] + class ConnectivityObserver { + private let completionQueue: CompletionQueue + private let underlyingChannel: UnsafeMutableRawPointer + private let underlyingCompletionQueue: UnsafeMutableRawPointer + private let callback: (ConnectivityState) -> () private var lastState: ConnectivityState - + private let queue: OperationQueue + var polling: Bool = false { didSet { - if polling == true && oldValue == false { - run() + queue.addOperation { [weak self] in + guard let `self` = self else { return } + + if self.polling == true && oldValue == false { + self.run() + } else if self.polling == false && oldValue == true { + self.shutdown() + } } } } - init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) { - self.state = state + init(underlyingChannel: UnsafeMutableRawPointer, callback: @escaping (ConnectivityState) -> ()) { self.underlyingChannel = underlyingChannel - self.tryToConnect = tryToConnect self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next() - self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") + self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") + self.callback = callback self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0)) + + queue = OperationQueue() + queue.maxConcurrentOperationCount = 1 + queue.qualityOfService = .background } deinit { - queue.shutdown() + shutdown() } private func run() { DispatchQueue.global().async { [weak self] in - guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return } + guard let `self` = self else { return } while self.polling { - guard !self.callbacks.isEmpty && !self.tryToConnect else { - self.polling = false - break - } - - defer { self.tryToConnect = false } - + guard let underlyingState = self.lastState.underlyingState else { return } + let deadline: TimeInterval = 0.2 cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil) - let event = self.queue.wait(timeout: deadline) + let event = self.completionQueue.wait(timeout: deadline) - if event.success == 1 || self.tryToConnect { - let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0)) + if event.success == 1 { + let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 1)) guard newState != self.lastState else { continue } defer { self.lastState = newState } - if self.lastState == self.state { - self.callbacks.forEach({ $0(newState) }) - } + self.callback(newState) } } } } - - static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool { - return lhs.state == rhs.state + + private func shutdown() { + completionQueue.shutdown() } } } From 08914828ecd2aceb6e0e8ebd792ec8757f8eb455 Mon Sep 17 00:00:00 2001 From: Sebastian Thiebaud Date: Tue, 22 May 2018 11:17:51 -0700 Subject: [PATCH 4/8] PR changes --- Sources/SwiftGRPC/Core/Channel.swift | 71 ++++++++++++---------------- 1 file changed, 30 insertions(+), 41 deletions(-) diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index 5909a5f61..c997ad7a5 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -73,7 +73,6 @@ public class Channel { } deinit { - connectivityObservers.forEach { $0.polling = false } cgrpc_channel_destroy(underlyingChannel) completionQueue.shutdown() } @@ -94,9 +93,8 @@ public class Channel { return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) } - public func subscribe(callback: @escaping (ConnectivityState) -> ()) { - let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback) - observer.polling = true + public func subscribe(callback: @escaping (ConnectivityState) -> Void) { + let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback) connectivityObservers.append(observer) } } @@ -106,66 +104,57 @@ private extension Channel { private let completionQueue: CompletionQueue private let underlyingChannel: UnsafeMutableRawPointer private let underlyingCompletionQueue: UnsafeMutableRawPointer - private let callback: (ConnectivityState) -> () + private let callback: (ConnectivityState) -> Void private var lastState: ConnectivityState - private let queue: OperationQueue - var polling: Bool = false { - didSet { - queue.addOperation { [weak self] in - guard let `self` = self else { return } - - if self.polling == true && oldValue == false { - self.run() - } else if self.polling == false && oldValue == true { - self.shutdown() - } - } - } - } - - init(underlyingChannel: UnsafeMutableRawPointer, callback: @escaping (ConnectivityState) -> ()) { + init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) { self.underlyingChannel = underlyingChannel self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next() self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") self.callback = callback - self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0)) - - queue = OperationQueue() - queue.maxConcurrentOperationCount = 1 - queue.qualityOfService = .background + self.lastState = currentState + run() } deinit { - shutdown() + completionQueue.shutdown() } private func run() { - DispatchQueue.global().async { [weak self] in + let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread") + + spinloopThreadQueue.async { [weak self] in guard let `self` = self else { return } - while self.polling { - guard let underlyingState = self.lastState.underlyingState else { return } + spinloop: while true { + guard let underlyingState = self.lastState.underlyingState else { + print("Couldn't retrieve `underlyingState`") + return + } let deadline: TimeInterval = 0.2 cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil) let event = self.completionQueue.wait(timeout: deadline) - if event.success == 1 { - let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 1)) - - guard newState != self.lastState else { continue } - defer { self.lastState = newState } - - self.callback(newState) + switch event.type { + case .complete: + let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0)) + + if newState != self.lastState { + self.callback(newState) + } + + self.lastState = newState + case .queueShutdown: + break spinloop + case .queueTimeout: + continue spinloop + default: + break spinloop } } } } - - private func shutdown() { - completionQueue.shutdown() - } } } From e8530326a8fb1b795bd4e850323b863a2fa3116a Mon Sep 17 00:00:00 2001 From: Sebastian Thiebaud Date: Tue, 22 May 2018 11:23:30 -0700 Subject: [PATCH 5/8] Add comments --- Sources/SwiftGRPC/Core/Channel.swift | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index c997ad7a5..c24dea80c 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -89,10 +89,17 @@ public class Channel { return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue) } + /// Check the current connectivity state + /// + /// - Parameter tryToConnect: boolean value to indicate if should try to connect if channel's connectivity state is idle + /// - Returns: a ConnectivityState value representing the current connectivity state of the channel public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState { return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) } + /// Subscribe to connectivity state changes + /// + /// - Parameter callback: block executed every time a new connectivity state is detected public func subscribe(callback: @escaping (ConnectivityState) -> Void) { let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback) connectivityObservers.append(observer) From 4ca615a5c3f5359450a5c110f6423dbacf42b6ff Mon Sep 17 00:00:00 2001 From: Sebastian Thiebaud Date: Tue, 22 May 2018 11:45:20 -0700 Subject: [PATCH 6/8] PR changes --- Sources/SwiftGRPC/Core/Channel.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index c24dea80c..ed581585a 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -133,7 +133,7 @@ private extension Channel { spinloopThreadQueue.async { [weak self] in guard let `self` = self else { return } - spinloop: while true { + while true { guard let underlyingState = self.lastState.underlyingState else { print("Couldn't retrieve `underlyingState`") return @@ -152,12 +152,13 @@ private extension Channel { } self.lastState = newState - case .queueShutdown: - break spinloop case .queueTimeout: - continue spinloop + continue + case .queueShutdown: + return default: - break spinloop + print("Event's completion type is `unknown`") + continue } } } From ad6e02259d8ae8d40b42f2031b91c0e730c1d8ba Mon Sep 17 00:00:00 2001 From: Sebastian Thiebaud Date: Tue, 22 May 2018 13:05:51 -0700 Subject: [PATCH 7/8] Remove print statements --- Sources/SwiftGRPC/Core/Channel.swift | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index ed581585a..be8d7640b 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -134,10 +134,7 @@ private extension Channel { guard let `self` = self else { return } while true { - guard let underlyingState = self.lastState.underlyingState else { - print("Couldn't retrieve `underlyingState`") - return - } + guard let underlyingState = self.lastState.underlyingState else { return } let deadline: TimeInterval = 0.2 cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil) @@ -157,7 +154,6 @@ private extension Channel { case .queueShutdown: return default: - print("Event's completion type is `unknown`") continue } } From 0d25ef84b2a5902ec4486a176343039ae7e274ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiebaud Date: Wed, 23 May 2018 09:38:34 -0700 Subject: [PATCH 8/8] PR changes --- Sources/SwiftGRPC/Core/Channel.swift | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index be8d7640b..5cd49e753 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -73,6 +73,7 @@ public class Channel { } deinit { + connectivityObservers.forEach { $0.shutdown() } cgrpc_channel_destroy(underlyingChannel) completionQueue.shutdown() } @@ -101,8 +102,7 @@ public class Channel { /// /// - Parameter callback: block executed every time a new connectivity state is detected public func subscribe(callback: @escaping (ConnectivityState) -> Void) { - let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback) - connectivityObservers.append(observer) + connectivityObservers.append(ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback)) } } @@ -124,15 +124,13 @@ private extension Channel { } deinit { - completionQueue.shutdown() + shutdown() } private func run() { let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread") - spinloopThreadQueue.async { [weak self] in - guard let `self` = self else { return } - + spinloopThreadQueue.async { while true { guard let underlyingState = self.lastState.underlyingState else { return } @@ -159,6 +157,10 @@ private extension Channel { } } } + + func shutdown() { + completionQueue.shutdown() + } } }