From 0bd14d84f14b1b3730afdbe142c1cc73f8450a47 Mon Sep 17 00:00:00 2001 From: Christopher Vanderschuere Date: Sun, 25 Mar 2018 11:38:53 -0700 Subject: [PATCH] Implement connectivity state property and observers (#8) --- Sources/CgRPC/shim/cgrpc.h | 6 + Sources/CgRPC/shim/channel.c | 5 + Sources/CgRPC/shim/completion_queue.c | 4 + Sources/SwiftGRPC/Core/Channel.swift | 148 +++++++++++++++++- .../SwiftGRPC/Core/ConnectivityState.swift | 32 ---- 5 files changed, 159 insertions(+), 36 deletions(-) delete mode 100644 Sources/SwiftGRPC/Core/ConnectivityState.swift diff --git a/Sources/CgRPC/shim/cgrpc.h b/Sources/CgRPC/shim/cgrpc.h index 87a82670e..6f1929d08 100644 --- a/Sources/CgRPC/shim/cgrpc.h +++ b/Sources/CgRPC/shim/cgrpc.h @@ -147,6 +147,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel); grpc_connectivity_state cgrpc_channel_check_connectivity_state( cgrpc_channel *channel, int try_to_connect); +void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, + cgrpc_completion_queue * completion_queue, + grpc_connectivity_state last_observed_state, + double deadline, + void *tag); // server support cgrpc_server *cgrpc_server_create(const char *address); @@ -159,6 +164,7 @@ void cgrpc_server_start(cgrpc_server *s); cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s); // completion queues +cgrpc_completion_queue *cgrpc_completion_queue_create_for_next(); grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq, double timeout); void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq); diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index 3ddcba0c2..1d4cb34dc 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -103,3 +103,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) { grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) { return grpc_channel_check_connectivity_state(channel->channel, try_to_connect); } + +void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) { + gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline); + return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag); +} diff --git a/Sources/CgRPC/shim/completion_queue.c b/Sources/CgRPC/shim/completion_queue.c index 726f9a4ed..7c5fa1351 100644 --- a/Sources/CgRPC/shim/completion_queue.c +++ b/Sources/CgRPC/shim/completion_queue.c @@ -18,6 +18,10 @@ #include +grpc_completion_queue *cgrpc_completion_queue_create_for_next() { + return grpc_completion_queue_create_for_next(NULL); +} + grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) { gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout); if (timeout < 0) { diff --git a/Sources/SwiftGRPC/Core/Channel.swift b/Sources/SwiftGRPC/Core/Channel.swift index f0ca4c61f..f04a9b0e5 100644 --- a/Sources/SwiftGRPC/Core/Channel.swift +++ b/Sources/SwiftGRPC/Core/Channel.swift @@ -31,10 +31,9 @@ public class Channel { /// Default host to use for new calls public var host: String - - public var connectivityState: ConnectivityState? { - return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0)) - } + + /// Connectivity state observers + private var observers: [ConnectivityObserver] = [] /// Initializes a gRPC channel /// @@ -81,4 +80,145 @@ public class Channel { let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)! return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue) } + + public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState { + return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) + } + + public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) { + var observer = observers.first(where: { $0.state == sourceState }) + + if observer == nil { + let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect) + observers.append(newObserver) + observer = newObserver + } + + observer?.callbacks.append(callback) + observer?.polling = true + } +} + +private extension Channel { + class ConnectivityObserver: Equatable { + let state: ConnectivityState + let queue: CompletionQueue + let underlyingChannel: UnsafeMutableRawPointer + let underlyingCompletionQueue: UnsafeMutableRawPointer + private(set) var tryToConnect: Bool + var callbacks: [(ConnectivityState) -> ()] = [] + private var lastState: ConnectivityState + + var polling: Bool = false { + didSet { + if polling == true && oldValue == false { + run() + } + } + } + + init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) { + self.state = state + self.underlyingChannel = underlyingChannel + self.tryToConnect = tryToConnect + self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next() + self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") + self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0)) + } + + deinit { + queue.shutdown() + } + + private func run() { + DispatchQueue.global().async { [weak self] in + guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return } + + while self.polling { + guard !self.callbacks.isEmpty && !self.tryToConnect else { + self.polling = false + break + } + + defer { self.tryToConnect = false } + + let deadline: TimeInterval = 0.2 + cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil) + let event = self.queue.wait(timeout: deadline) + + if event.success == 1 || self.tryToConnect { + let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0)) + + guard newState != self.lastState else { continue } + defer { self.lastState = newState } + + if self.lastState == self.state { + self.callbacks.forEach({ $0(newState) }) + } + } + } + } + } + + static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool { + return lhs.state == rhs.state + } + } +} + +extension Channel { + public enum ConnectivityState { + /// Channel has just been initialized + case initialized + /// Channel is idle + case idle + /// Channel is connecting + case connecting + /// Channel is ready for work + case ready + /// Channel has seen a failure but expects to recover + case transientFailure + /// Channel has seen a failure that it cannot recover from + case shutdown + /// Channel connectivity state is unknown + case unknown + + fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState { + switch value { + case GRPC_CHANNEL_INIT: + return .initialized + case GRPC_CHANNEL_IDLE: + return .idle + case GRPC_CHANNEL_CONNECTING: + return .connecting + case GRPC_CHANNEL_READY: + return .ready + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return .transientFailure + case GRPC_CHANNEL_SHUTDOWN: + return .shutdown + default: + return .unknown + } + } + + fileprivate var underlyingState: grpc_connectivity_state? { + switch self { + case .initialized: + return GRPC_CHANNEL_INIT + case .idle: + return GRPC_CHANNEL_IDLE + case .connecting: + return GRPC_CHANNEL_CONNECTING + case .ready: + return GRPC_CHANNEL_READY + case .transientFailure: + return GRPC_CHANNEL_TRANSIENT_FAILURE + case .shutdown: + return GRPC_CHANNEL_SHUTDOWN + default: + return nil + } + } + } } diff --git a/Sources/SwiftGRPC/Core/ConnectivityState.swift b/Sources/SwiftGRPC/Core/ConnectivityState.swift deleted file mode 100644 index 58e6c01ca..000000000 --- a/Sources/SwiftGRPC/Core/ConnectivityState.swift +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2018, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#if SWIFT_PACKAGE - import CgRPC -#endif -import Foundation - -public enum ConnectivityState: Int32, Error { - case initializing = -1 - case idle - case connecting - case ready - case transient_failure - case shutdown - - static func fromCEnum(_ connectivityState: grpc_connectivity_state) -> ConnectivityState? { - return ConnectivityState(rawValue: connectivityState.rawValue) - } -}