Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement connectivity state property and observers #196

Merged
merged 9 commits into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);
grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel,
cgrpc_completion_queue * completion_queue,
cgrpc_completion_queue *completion_queue,
grpc_connectivity_state last_observed_state,
double deadline,
void *tag);
Expand Down
2 changes: 1 addition & 1 deletion Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *ch
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}

void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline);
return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag);
}
91 changes: 42 additions & 49 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Channel {
public var host: String

/// Connectivity state observers
private var observers: [ConnectivityObserver] = []
private var connectivityObservers: [ConnectivityObserver] = []

/// Initializes a gRPC channel
///
Expand All @@ -47,8 +47,7 @@ public class Channel {
} else {
underlyingChannel = cgrpc_channel_create(address)
}
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

Expand All @@ -59,13 +58,13 @@ public class Channel {
/// - Parameter host: an optional hostname override
public init(address: String, certificates: String, host: String?) {
self.host = address
underlyingChannel = cgrpc_channel_create_secure(address, certificates, host)
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
underlyingChannel = cgrpc_channel_create_secure(address, certificates, &argumentValues, Int32(arguments.count))
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

deinit {
connectivityObservers.forEach { $0.polling = false }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either use a lock for setting polling, or use $0.shutdown() (see the other comments).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling.

cgrpc_channel_destroy(underlyingChannel)
completionQueue.shutdown()
}
Expand All @@ -86,83 +85,77 @@ public class Channel {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) {
var observer = observers.first(where: { $0.state == sourceState })

if observer == nil {
let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect)
observers.append(newObserver)
observer = newObserver
}

observer?.callbacks.append(callback)
observer?.polling = true
public func subscribe(callback: @escaping (ConnectivityState) -> ()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -> Void?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback)
observer.polling = true
connectivityObservers.append(observer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory there could be race conditions here of subscribe were to be called from different threads, but I think we can ignore that possibility here.

}
}

private extension Channel {
class ConnectivityObserver: Equatable {
let state: ConnectivityState
let queue: CompletionQueue
let underlyingChannel: UnsafeMutableRawPointer
let underlyingCompletionQueue: UnsafeMutableRawPointer
private(set) var tryToConnect: Bool
var callbacks: [(ConnectivityState) -> ()] = []
class ConnectivityObserver {
private let completionQueue: CompletionQueue
private let underlyingChannel: UnsafeMutableRawPointer
private let underlyingCompletionQueue: UnsafeMutableRawPointer
private let callback: (ConnectivityState) -> ()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-nit: how about -> Void?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private var lastState: ConnectivityState

private let queue: OperationQueue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please just use SwiftGRPC.Mutex to guard all accesses to polling (including the loop below and connectivityObservers.forEach { $0.polling = false } above), an operation queue is overkill here. (Can be disregarded if we eliminate polling, but please still remove the queue variable in that case.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling.


var polling: Bool = false {
didSet {
if polling == true && oldValue == false {
run()
queue.addOperation { [weak self] in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like polling will only set to true once, right after initializing. So how about calling self.run() at the end of init(), and make shutdown available to Channel.deinit instead? In that case, we might even get by without a polling variable altogether if you add support for handling shutdown events in the spinloop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling

guard let `self` = self else { return }

if self.polling == true && oldValue == false {
self.run()
} else if self.polling == false && oldValue == true {
self.shutdown()
}
}
}
}

init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) {
self.state = state
init(underlyingChannel: UnsafeMutableRawPointer, callback: @escaping (ConnectivityState) -> ()) {
self.underlyingChannel = underlyingChannel
self.tryToConnect = tryToConnect
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.callback = callback
self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would add a currentState argument to init and use the result of that here. The initializer call in Channel would become

let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: self.connectivityState(), callback: callback)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
queue.qualityOfService = .background
}

deinit {
queue.shutdown()
shutdown()
}

private func run() {
DispatchQueue.global().async { [weak self] in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the thread-spawning pattern now used in e.g.

var threadLabel = "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread"
here now instead of dispatching to the global queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return }
guard let `self` = self else { return }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the line that retains self by acquiring a strong reference to it. It does not make sense to try to get rid of it; instead avoid the [weak self] in guard let self = self altogether and keep the forEach { $0.shutdown() } line above. So I think at the moment your code still leaks threads.

To be honest, I'd like to have a test that spins up 100 observers to ensure all their spinloop threads get spun down once the channel closes (i.e. ensure no leaks are happening), but I can understand if you don't want to add one (should be fairly easy, though).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I think this has not been resolved yet (2).


while self.polling {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace self.polling with (lock.synchronize { self.polling }). (Can be disregarded if we get rid of polling.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of polling.

guard !self.callbacks.isEmpty && !self.tryToConnect else {
self.polling = false
break
}

defer { self.tryToConnect = false }

guard let underlyingState = self.lastState.underlyingState else { return }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log an error in this case, as this would be very unexpected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't underlyingState be the last observed state? At the moment this is only updated once, at the very beginning of the spinloop, so this will fire for any states that are different from the state when the loop was started, instead of simply firing every time we detect any change.

let event = self.queue.wait(timeout: deadline)
let event = self.completionQueue.wait(timeout: deadline)

if event.success == 1 || self.tryToConnect {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0))
if event.success == 1 {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this should always try to connect? This would make the connectivity observer reconnect all the time, even though the actual channel is currently not used by the client code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I missed that one.


guard newState != self.lastState else { continue }
defer { self.lastState = newState }

if self.lastState == self.state {
self.callbacks.forEach({ $0(newState) })
}
self.callback(newState)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to handle queue shutdown at this point as well and exit the loop in that case. FYI, this loop retains self, which means that deinit (and thus shutdown) will never get called by itself — but shutdown will be indirectly called by Channel.deinit, so we should be fine here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss something here, but I don't think this loop retains self. It's been weakened.

}
}
}

static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool {
return lhs.state == rhs.state
private func shutdown() {
completionQueue.shutdown()
}
}
}
Expand Down