diff --git a/Sources/CgRPC/shim/cgrpc.h b/Sources/CgRPC/shim/cgrpc.h index 38015c0a0..8370db938 100644 --- a/Sources/CgRPC/shim/cgrpc.h +++ b/Sources/CgRPC/shim/cgrpc.h @@ -178,6 +178,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel); grpc_connectivity_state cgrpc_channel_check_connectivity_state( cgrpc_channel *channel, int try_to_connect); +void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, + cgrpc_completion_queue *completion_queue, + grpc_connectivity_state last_observed_state, + double deadline, + void *tag); // server support cgrpc_server *cgrpc_server_create(const char *address); @@ -190,6 +195,7 @@ void cgrpc_server_start(cgrpc_server *s); cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s); // completion queues +cgrpc_completion_queue *cgrpc_completion_queue_create_for_next(); grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq, double timeout); void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq); diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index a03131550..babceef57 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -92,3 +92,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) { grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) { return grpc_channel_check_connectivity_state(channel->channel, try_to_connect); } + +void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) { + gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline); + return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag); +} diff --git a/Sources/CgRPC/shim/completion_queue.c b/Sources/CgRPC/shim/completion_queue.c index 726f9a4ed..7c5fa1351 100644 --- a/Sources/CgRPC/shim/completion_queue.c +++ b/Sources/CgRPC/shim/completion_queue.c @@ -18,6 +18,10 @@ #include +grpc_completion_queue *cgrpc_completion_queue_create_for_next() { + return grpc_completion_queue_create_for_next(NULL); +} + grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) { gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout); if (timeout < 0) { diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index 79507b4e4..5cd49e753 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -15,6 +15,7 @@ */ #if SWIFT_PACKAGE import CgRPC + import Dispatch #endif import Foundation @@ -31,10 +32,9 @@ public class Channel { /// Default host to use for new calls public var host: String - - public var connectivityState: ConnectivityState? { - return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0)) - } + + /// Connectivity state observers + private var connectivityObservers: [ConnectivityObserver] = [] /// Initializes a gRPC channel /// @@ -52,8 +52,7 @@ public class Channel { } else { underlyingChannel = cgrpc_channel_create(address, &argumentValues, Int32(arguments.count)) } - completionQueue = CompletionQueue( - underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") + completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") completionQueue.run() // start a loop that watches the channel's completion queue } @@ -64,17 +63,17 @@ public class Channel { /// - Parameter arguments: list of channel configuration options public init(address: String, certificates: String, arguments: [Argument] = []) { gRPC.initialize() - self.host = address + host = address let argumentWrappers = arguments.map { $0.toCArg() } var argumentValues = argumentWrappers.map { $0.wrapped } underlyingChannel = cgrpc_channel_create_secure(address, certificates, &argumentValues, Int32(arguments.count)) - completionQueue = CompletionQueue( - underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") + completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client") completionQueue.run() // start a loop that watches the channel's completion queue } deinit { + connectivityObservers.forEach { $0.shutdown() } cgrpc_channel_destroy(underlyingChannel) completionQueue.shutdown() } @@ -90,4 +89,134 @@ public class Channel { let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)! return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue) } + + /// Check the current connectivity state + /// + /// - Parameter tryToConnect: boolean value to indicate if should try to connect if channel's connectivity state is idle + /// - Returns: a ConnectivityState value representing the current connectivity state of the channel + public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState { + return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) + } + + /// Subscribe to connectivity state changes + /// + /// - Parameter callback: block executed every time a new connectivity state is detected + public func subscribe(callback: @escaping (ConnectivityState) -> Void) { + connectivityObservers.append(ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback)) + } +} + +private extension Channel { + class ConnectivityObserver { + private let completionQueue: CompletionQueue + private let underlyingChannel: UnsafeMutableRawPointer + private let underlyingCompletionQueue: UnsafeMutableRawPointer + private let callback: (ConnectivityState) -> Void + private var lastState: ConnectivityState + + init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) { + self.underlyingChannel = underlyingChannel + self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next() + self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") + self.callback = callback + self.lastState = currentState + run() + } + + deinit { + shutdown() + } + + private func run() { + let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread") + + spinloopThreadQueue.async { + while true { + guard let underlyingState = self.lastState.underlyingState else { return } + + let deadline: TimeInterval = 0.2 + cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil) + let event = self.completionQueue.wait(timeout: deadline) + + switch event.type { + case .complete: + let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0)) + + if newState != self.lastState { + self.callback(newState) + } + + self.lastState = newState + case .queueTimeout: + continue + case .queueShutdown: + return + default: + continue + } + } + } + } + + func shutdown() { + completionQueue.shutdown() + } + } +} + +extension Channel { + public enum ConnectivityState { + /// Channel has just been initialized + case initialized + /// Channel is idle + case idle + /// Channel is connecting + case connecting + /// Channel is ready for work + case ready + /// Channel has seen a failure but expects to recover + case transientFailure + /// Channel has seen a failure that it cannot recover from + case shutdown + /// Channel connectivity state is unknown + case unknown + + fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState { + switch value { + case GRPC_CHANNEL_INIT: + return .initialized + case GRPC_CHANNEL_IDLE: + return .idle + case GRPC_CHANNEL_CONNECTING: + return .connecting + case GRPC_CHANNEL_READY: + return .ready + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return .transientFailure + case GRPC_CHANNEL_SHUTDOWN: + return .shutdown + default: + return .unknown + } + } + + fileprivate var underlyingState: grpc_connectivity_state? { + switch self { + case .initialized: + return GRPC_CHANNEL_INIT + case .idle: + return GRPC_CHANNEL_IDLE + case .connecting: + return GRPC_CHANNEL_CONNECTING + case .ready: + return GRPC_CHANNEL_READY + case .transientFailure: + return GRPC_CHANNEL_TRANSIENT_FAILURE + case .shutdown: + return GRPC_CHANNEL_SHUTDOWN + default: + return nil + } + } + } } diff --git a/Sources/SwiftGRPC/Core/ConnectivityState.swift b/Sources/SwiftGRPC/Core/ConnectivityState.swift deleted file mode 100644 index 58e6c01ca..000000000 --- a/Sources/SwiftGRPC/Core/ConnectivityState.swift +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2018, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#if SWIFT_PACKAGE - import CgRPC -#endif -import Foundation - -public enum ConnectivityState: Int32, Error { - case initializing = -1 - case idle - case connecting - case ready - case transient_failure - case shutdown - - static func fromCEnum(_ connectivityState: grpc_connectivity_state) -> ConnectivityState? { - return ConnectivityState(rawValue: connectivityState.rawValue) - } -}