Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement connectivity state property and observers #196

Merged
merged 9 commits into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);

grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel,
cgrpc_completion_queue *completion_queue,
grpc_connectivity_state last_observed_state,
double deadline,
void *tag);

// server support
cgrpc_server *cgrpc_server_create(const char *address);
Expand All @@ -190,6 +195,7 @@ void cgrpc_server_start(cgrpc_server *s);
cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s);

// completion queues
cgrpc_completion_queue *cgrpc_completion_queue_create_for_next();
grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq,
double timeout);
void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
Expand Down
5 changes: 5 additions & 0 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) {
grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) {
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}

void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline);
return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag);
}
4 changes: 4 additions & 0 deletions Sources/CgRPC/shim/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include <stdio.h>

grpc_completion_queue *cgrpc_completion_queue_create_for_next() {
return grpc_completion_queue_create_for_next(NULL);
}

grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) {
gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
if (timeout < 0) {
Expand Down
147 changes: 138 additions & 9 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#if SWIFT_PACKAGE
import CgRPC
import Dispatch
#endif
import Foundation

Expand All @@ -31,10 +32,9 @@ public class Channel {

/// Default host to use for new calls
public var host: String

public var connectivityState: ConnectivityState? {
return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0))
}

/// Connectivity state observers
private var connectivityObservers: [ConnectivityObserver] = []

/// Initializes a gRPC channel
///
Expand All @@ -52,8 +52,7 @@ public class Channel {
} else {
underlyingChannel = cgrpc_channel_create(address, &argumentValues, Int32(arguments.count))
}
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

Expand All @@ -64,17 +63,17 @@ public class Channel {
/// - Parameter arguments: list of channel configuration options
public init(address: String, certificates: String, arguments: [Argument] = []) {
gRPC.initialize()
self.host = address
host = address
let argumentWrappers = arguments.map { $0.toCArg() }
var argumentValues = argumentWrappers.map { $0.wrapped }

underlyingChannel = cgrpc_channel_create_secure(address, certificates, &argumentValues, Int32(arguments.count))
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

deinit {
connectivityObservers.forEach { $0.shutdown() }
cgrpc_channel_destroy(underlyingChannel)
completionQueue.shutdown()
}
Expand All @@ -90,4 +89,134 @@ public class Channel {
let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
}

/// Check the current connectivity state
///
/// - Parameter tryToConnect: boolean value to indicate if should try to connect if channel's connectivity state is idle
/// - Returns: a ConnectivityState value representing the current connectivity state of the channel
public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

/// Subscribe to connectivity state changes
///
/// - Parameter callback: block executed every time a new connectivity state is detected
public func subscribe(callback: @escaping (ConnectivityState) -> Void) {
connectivityObservers.append(ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback))
}
}

private extension Channel {
class ConnectivityObserver {
private let completionQueue: CompletionQueue
private let underlyingChannel: UnsafeMutableRawPointer
private let underlyingCompletionQueue: UnsafeMutableRawPointer
private let callback: (ConnectivityState) -> Void
private var lastState: ConnectivityState

init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) {
self.underlyingChannel = underlyingChannel
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.callback = callback
self.lastState = currentState
run()
}

deinit {
shutdown()
}

private func run() {
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread")

spinloopThreadQueue.async {
while true {
guard let underlyingState = self.lastState.underlyingState else { return }

let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
let event = self.completionQueue.wait(timeout: deadline)

switch event.type {
case .complete:
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))

if newState != self.lastState {
self.callback(newState)
}

self.lastState = newState
case .queueTimeout:
continue
case .queueShutdown:
return
default:
continue
}
}
}
}

func shutdown() {
completionQueue.shutdown()
}
}
}

extension Channel {
public enum ConnectivityState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This is much better than what I had added before.

/// Channel has just been initialized
case initialized
/// Channel is idle
case idle
/// Channel is connecting
case connecting
/// Channel is ready for work
case ready
/// Channel has seen a failure but expects to recover
case transientFailure
/// Channel has seen a failure that it cannot recover from
case shutdown
/// Channel connectivity state is unknown
case unknown

fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
switch value {
case GRPC_CHANNEL_INIT:
return .initialized
case GRPC_CHANNEL_IDLE:
return .idle
case GRPC_CHANNEL_CONNECTING:
return .connecting
case GRPC_CHANNEL_READY:
return .ready
case GRPC_CHANNEL_TRANSIENT_FAILURE:
return .transientFailure
case GRPC_CHANNEL_SHUTDOWN:
return .shutdown
default:
return .unknown
}
}

fileprivate var underlyingState: grpc_connectivity_state? {
switch self {
case .initialized:
return GRPC_CHANNEL_INIT
case .idle:
return GRPC_CHANNEL_IDLE
case .connecting:
return GRPC_CHANNEL_CONNECTING
case .ready:
return GRPC_CHANNEL_READY
case .transientFailure:
return GRPC_CHANNEL_TRANSIENT_FAILURE
case .shutdown:
return GRPC_CHANNEL_SHUTDOWN
default:
return nil
}
}
}
}
32 changes: 0 additions & 32 deletions Sources/SwiftGRPC/Core/ConnectivityState.swift

This file was deleted.