Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement connectivity state property and observers #196

Merged
merged 9 commits into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);

grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel,
cgrpc_completion_queue * completion_queue,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please remove the space after the asterisk.

grpc_connectivity_state last_observed_state,
double deadline,
void *tag);

// server support
cgrpc_server *cgrpc_server_create(const char *address);
Expand All @@ -159,6 +164,7 @@ void cgrpc_server_start(cgrpc_server *s);
cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s);

// completion queues
cgrpc_completion_queue *cgrpc_completion_queue_create_for_next();
grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq,
double timeout);
void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
Expand Down
5 changes: 5 additions & 0 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) {
grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) {
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}

void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please remove the space after the asterisk.

gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline);
return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag);
}
4 changes: 4 additions & 0 deletions Sources/CgRPC/shim/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include <stdio.h>

grpc_completion_queue *cgrpc_completion_queue_create_for_next() {
return grpc_completion_queue_create_for_next(NULL);
}

grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) {
gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
if (timeout < 0) {
Expand Down
149 changes: 145 additions & 4 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#if SWIFT_PACKAGE
import CgRPC
import Dispatch
#endif
import Foundation

Expand All @@ -31,10 +32,9 @@ public class Channel {

/// Default host to use for new calls
public var host: String

public var connectivityState: ConnectivityState? {
return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0))
}

/// Connectivity state observers
private var observers: [ConnectivityObserver] = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: rename this to connectivityObservers?


/// Initializes a gRPC channel
///
Expand Down Expand Up @@ -81,4 +81,145 @@ public class Channel {
let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
}

public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest removing tryToConnect here — it doesn't work anyway if subscribe is called with tryToConnect = true while a subscriber with that sourceState already exists.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a comment to this method, especially to explain what sourceState is for? It looks to me like the callback is only fired if the channel's state changes to sourceState, is that correct? Why not subscribe to all state changes and have the caller exclude unwanted ones themselves in the callback (could be done with a simple if state != sourceState { return } in the callback)? That would also avoid spinning up a separate thread for each source state.

var observer = observers.first(where: { $0.state == sourceState })
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest using a trailing closure here.


if observer == nil {
let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect)
observers.append(newObserver)
observer = newObserver
}

observer?.callbacks.append(callback)
observer?.polling = true
}
}

private extension Channel {
class ConnectivityObserver: Equatable {
let state: ConnectivityState
let queue: CompletionQueue
let underlyingChannel: UnsafeMutableRawPointer
let underlyingCompletionQueue: UnsafeMutableRawPointer
private(set) var tryToConnect: Bool
var callbacks: [(ConnectivityState) -> ()] = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access to callbacks definitely needs to be synchronized.

private var lastState: ConnectivityState

var polling: Bool = false {
didSet {
if polling == true && oldValue == false {
run()
}
}
}

init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) {
self.state = state
self.underlyingChannel = underlyingChannel
self.tryToConnect = tryToConnect
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would add a currentState argument to init and use the result of that here. The initializer call in Channel would become

let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: self.connectivityState(), callback: callback)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

deinit {
queue.shutdown()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like this will never get deallocated, as you are acquiring a strong reference to self right before the spinloop starts and never let go of it until the end of the spinloop. I would suggest shutting down all observers in Channel.deinit as well. Let me know if I'm missing something.

}

private func run() {
DispatchQueue.global().async { [weak self] in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the thread-spawning pattern now used in e.g.

var threadLabel = "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread"
here now instead of dispatching to the global queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return }

while self.polling {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace self.polling with (lock.synchronize { self.polling }). (Can be disregarded if we get rid of polling.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling.

guard !self.callbacks.isEmpty && !self.tryToConnect else {
self.polling = false
break
}

defer { self.tryToConnect = false }

let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't underlyingState be the last observed state? At the moment this is only updated once, at the very beginning of the spinloop, so this will fire for any states that are different from the state when the loop was started, instead of simply firing every time we detect any change.

let event = self.queue.wait(timeout: deadline)

if event.success == 1 || self.tryToConnect {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0))

guard newState != self.lastState else { continue }
defer { self.lastState = newState }

if self.lastState == self.state {
self.callbacks.forEach({ $0(newState) })
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please use a trailing closure here.

}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to handle queue shutdown at this point as well and exit the loop in that case. FYI, this loop retains self, which means that deinit (and thus shutdown) will never get called by itself — but shutdown will be indirectly called by Channel.deinit, so we should be fine here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss something here, but I don't think this loop retains self. It's been weakened.

}
}
}

static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Space before paren. Also, I would remove this method altogether — two observers on different channels would clearly not be equal just because their states are equal. It also doesn't seem to be used.

return lhs.state == rhs.state
}
}
}

extension Channel {
public enum ConnectivityState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This is much better than what I had added before.

/// Channel has just been initialized
case initialized
/// Channel is idle
case idle
/// Channel is connecting
case connecting
/// Channel is ready for work
case ready
/// Channel has seen a failure but expects to recover
case transientFailure
/// Channel has seen a failure that it cannot recover from
case shutdown
/// Channel connectivity state is unknown
case unknown

fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
switch value {
case GRPC_CHANNEL_INIT:
return .initialized
case GRPC_CHANNEL_IDLE:
return .idle
case GRPC_CHANNEL_CONNECTING:
return .connecting
case GRPC_CHANNEL_READY:
return .ready
case GRPC_CHANNEL_TRANSIENT_FAILURE:
return .transientFailure
case GRPC_CHANNEL_SHUTDOWN:
return .shutdown
default:
return .unknown
}
}

fileprivate var underlyingState: grpc_connectivity_state? {
switch self {
case .initialized:
return GRPC_CHANNEL_INIT
case .idle:
return GRPC_CHANNEL_IDLE
case .connecting:
return GRPC_CHANNEL_CONNECTING
case .ready:
return GRPC_CHANNEL_READY
case .transientFailure:
return GRPC_CHANNEL_TRANSIENT_FAILURE
case .shutdown:
return GRPC_CHANNEL_SHUTDOWN
default:
return nil
}
}
}
}
32 changes: 0 additions & 32 deletions Sources/SwiftGRPC/Core/ConnectivityState.swift

This file was deleted.