Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement connectivity state property and observers #196

Merged
merged 9 commits into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);

grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel,
cgrpc_completion_queue *completion_queue,
grpc_connectivity_state last_observed_state,
double deadline,
void *tag);

// server support
cgrpc_server *cgrpc_server_create(const char *address);
Expand All @@ -159,6 +164,7 @@ void cgrpc_server_start(cgrpc_server *s);
cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s);

// completion queues
cgrpc_completion_queue *cgrpc_completion_queue_create_for_next();
grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq,
double timeout);
void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
Expand Down
5 changes: 5 additions & 0 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) {
grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) {
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}

void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline);
return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag);
}
4 changes: 4 additions & 0 deletions Sources/CgRPC/shim/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include <stdio.h>

grpc_completion_queue *cgrpc_completion_queue_create_for_next() {
return grpc_completion_queue_create_for_next(NULL);
}

grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) {
gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
if (timeout < 0) {
Expand Down
152 changes: 143 additions & 9 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#if SWIFT_PACKAGE
import CgRPC
import Dispatch
#endif
import Foundation

Expand All @@ -31,10 +32,9 @@ public class Channel {

/// Default host to use for new calls
public var host: String

public var connectivityState: ConnectivityState? {
return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0))
}

/// Connectivity state observers
private var connectivityObservers: [ConnectivityObserver] = []

/// Initializes a gRPC channel
///
Expand All @@ -47,8 +47,7 @@ public class Channel {
} else {
underlyingChannel = cgrpc_channel_create(address)
}
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

Expand All @@ -59,13 +58,13 @@ public class Channel {
/// - Parameter host: an optional hostname override
public init(address: String, certificates: String, host: String?) {
self.host = address
underlyingChannel = cgrpc_channel_create_secure(address, certificates, host)
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
underlyingChannel = cgrpc_channel_create_secure(address, certificates, &argumentValues, Int32(arguments.count))
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

deinit {
connectivityObservers.forEach { $0.polling = false }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either use a lock for setting polling, or use $0.shutdown() (see the other comments).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling.

cgrpc_channel_destroy(underlyingChannel)
completionQueue.shutdown()
}
Expand All @@ -81,4 +80,139 @@ public class Channel {
let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
}

public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

public func subscribe(callback: @escaping (ConnectivityState) -> ()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -> Void?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback)
observer.polling = true
connectivityObservers.append(observer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory there could be race conditions here of subscribe were to be called from different threads, but I think we can ignore that possibility here.

}
}

private extension Channel {
class ConnectivityObserver {
private let completionQueue: CompletionQueue
private let underlyingChannel: UnsafeMutableRawPointer
private let underlyingCompletionQueue: UnsafeMutableRawPointer
private let callback: (ConnectivityState) -> ()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-nit: how about -> Void?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private var lastState: ConnectivityState
private let queue: OperationQueue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please just use SwiftGRPC.Mutex to guard all accesses to polling (including the loop below and connectivityObservers.forEach { $0.polling = false } above), an operation queue is overkill here. (Can be disregarded if we eliminate polling, but please still remove the queue variable in that case.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling.


var polling: Bool = false {
didSet {
queue.addOperation { [weak self] in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like polling will only set to true once, right after initializing. So how about calling self.run() at the end of init(), and make shutdown available to Channel.deinit instead? In that case, we might even get by without a polling variable altogether if you add support for handling shutdown events in the spinloop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling

guard let `self` = self else { return }

if self.polling == true && oldValue == false {
self.run()
} else if self.polling == false && oldValue == true {
self.shutdown()
}
}
}
}

init(underlyingChannel: UnsafeMutableRawPointer, callback: @escaping (ConnectivityState) -> ()) {
self.underlyingChannel = underlyingChannel
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.callback = callback
self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would add a currentState argument to init and use the result of that here. The initializer call in Channel would become

let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: self.connectivityState(), callback: callback)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
queue.qualityOfService = .background
}

deinit {
shutdown()
}

private func run() {
DispatchQueue.global().async { [weak self] in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the thread-spawning pattern now used in e.g.

var threadLabel = "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread"
here now instead of dispatching to the global queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

guard let `self` = self else { return }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the line that retains self by acquiring a strong reference to it. It does not make sense to try to get rid of it; instead avoid the [weak self] in guard let self = self altogether and keep the forEach { $0.shutdown() } line above. So I think at the moment your code still leaks threads.

To be honest, I'd like to have a test that spins up 100 observers to ensure all their spinloop threads get spun down once the channel closes (i.e. ensure no leaks are happening), but I can understand if you don't want to add one (should be fairly easy, though).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I think this has not been resolved yet (2).


while self.polling {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace self.polling with (lock.synchronize { self.polling }). (Can be disregarded if we get rid of polling.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling.

guard let underlyingState = self.lastState.underlyingState else { return }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log an error in this case, as this would be very unexpected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't underlyingState be the last observed state? At the moment this is only updated once, at the very beginning of the spinloop, so this will fire for any states that are different from the state when the loop was started, instead of simply firing every time we detect any change.

let event = self.completionQueue.wait(timeout: deadline)

if event.success == 1 {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this should always try to connect? This would make the connectivity observer reconnect all the time, even though the actual channel is currently not used by the client code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I missed that one.


guard newState != self.lastState else { continue }
defer { self.lastState = newState }

self.callback(newState)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to handle queue shutdown at this point as well and exit the loop in that case. FYI, this loop retains self, which means that deinit (and thus shutdown) will never get called by itself — but shutdown will be indirectly called by Channel.deinit, so we should be fine here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss something here, but I don't think this loop retains self. It's been weakened.

}
}
}

private func shutdown() {
completionQueue.shutdown()
}
}
}

extension Channel {
public enum ConnectivityState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This is much better than what I had added before.

/// Channel has just been initialized
case initialized
/// Channel is idle
case idle
/// Channel is connecting
case connecting
/// Channel is ready for work
case ready
/// Channel has seen a failure but expects to recover
case transientFailure
/// Channel has seen a failure that it cannot recover from
case shutdown
/// Channel connectivity state is unknown
case unknown

fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
switch value {
case GRPC_CHANNEL_INIT:
return .initialized
case GRPC_CHANNEL_IDLE:
return .idle
case GRPC_CHANNEL_CONNECTING:
return .connecting
case GRPC_CHANNEL_READY:
return .ready
case GRPC_CHANNEL_TRANSIENT_FAILURE:
return .transientFailure
case GRPC_CHANNEL_SHUTDOWN:
return .shutdown
default:
return .unknown
}
}

fileprivate var underlyingState: grpc_connectivity_state? {
switch self {
case .initialized:
return GRPC_CHANNEL_INIT
case .idle:
return GRPC_CHANNEL_IDLE
case .connecting:
return GRPC_CHANNEL_CONNECTING
case .ready:
return GRPC_CHANNEL_READY
case .transientFailure:
return GRPC_CHANNEL_TRANSIENT_FAILURE
case .shutdown:
return GRPC_CHANNEL_SHUTDOWN
default:
return nil
}
}
}
}
32 changes: 0 additions & 32 deletions Sources/SwiftGRPC/Core/ConnectivityState.swift

This file was deleted.