-
Notifications
You must be signed in to change notification settings - Fork 420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement connectivity state property and observers #196
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! Would it be possible to add a simple test, at least to ensure that this works for some state changes?
Sources/CgRPC/shim/cgrpc.h
Outdated
@@ -147,6 +147,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel); | |||
|
|||
grpc_connectivity_state cgrpc_channel_check_connectivity_state( | |||
cgrpc_channel *channel, int try_to_connect); | |||
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, | |||
cgrpc_completion_queue * completion_queue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please remove the space after the asterisk.
Sources/CgRPC/shim/channel.c
Outdated
@@ -103,3 +103,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) { | |||
grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) { | |||
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect); | |||
} | |||
|
|||
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please remove the space after the asterisk.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
} | ||
|
||
/// Connectivity state observers | ||
private var observers: [ConnectivityObserver] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename this to connectivityObservers
?
Sources/SwiftGRPC/Core/Channel.swift
Outdated
} | ||
|
||
public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) { | ||
var observer = observers.first(where: { $0.state == sourceState }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest using a trailing closure here.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
defer { self.lastState = newState } | ||
|
||
if self.lastState == self.state { | ||
self.callbacks.forEach({ $0(newState) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please use a trailing closure here.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) | ||
} | ||
|
||
public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest removing tryToConnect
here — it doesn't work anyway if subscribe
is called with tryToConnect = true
while a subscriber with that sourceState
already exists.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0)) | ||
} | ||
|
||
public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind adding a comment to this method, especially to explain what sourceState
is for? It looks to me like the callback is only fired if the channel's state changes to sourceState
, is that correct? Why not subscribe to all state changes and have the caller exclude unwanted ones themselves in the callback (could be done with a simple if state != sourceState { return }
in the callback)? That would also avoid spinning up a separate thread for each source state.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
defer { self.tryToConnect = false } | ||
|
||
let deadline: TimeInterval = 0.2 | ||
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't underlyingState
be the last observed state? At the moment this is only updated once, at the very beginning of the spinloop, so this will fire for any states that are different from the state when the loop was started, instead of simply firing every time we detect any change.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
} | ||
|
||
deinit { | ||
queue.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me like this will never get deallocated, as you are acquiring a strong reference to self
right before the spinloop starts and never let go of it until the end of the spinloop. I would suggest shutting down all observers in Channel.deinit
as well. Let me know if I'm missing something.
} | ||
|
||
extension Channel { | ||
public enum ConnectivityState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This is much better than what I had added before.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
let underlyingChannel: UnsafeMutableRawPointer | ||
let underlyingCompletionQueue: UnsafeMutableRawPointer | ||
private(set) var tryToConnect: Bool | ||
var callbacks: [(ConnectivityState) -> ()] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access to callbacks
definitely needs to be synchronized.
Update: haven't had a chance to get to these fixes. I plan on coming back to it by the end of the week. |
No worries! Most of these are just nits, anyway; but the possible thread leak and synchronization issues should definitely be investigated — maybe there actually no problems, but it's still worth checking. |
@cvanderschuere Looks great! We'll merge after those requested changes. Thanks for this! |
@cvanderschuere @MrMage Is there more coming in this PR? If not, I'll merge it and we can address anything remaining in future PRs. |
I don’t think this should be merged until the comments have been addressed. I have not had chance get back to this, but do plan on it eventually. I would also not be opposed to someone taking over the PR and making the necessary changes. |
I changed a few things. Let me know @MrMage if it's more inline with what you have in mind. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the changes! I have much more trust in this now, but there's one more big request/suggestion:
Please handle shutdown events in the spinloop inside run
(I think there should be an example of this in CompletionQueue.swift
). If you do that, we might get rid of the polling
variable altogether (and instead just call observer.shutdown()
from Channel.deinit
), and in that case you can disregard my comments about serializing access to polling
. If you decide to keep the polling
variable, however, all accesses to it should be guarded by a lock (but the operation queue should be unnecessary).
What do you think?
Sources/SwiftGRPC/Core/Channel.swift
Outdated
private var lastState: ConnectivityState | ||
|
||
private let queue: OperationQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please just use SwiftGRPC.Mutex
to guard all accesses to polling
(including the loop below and connectivityObservers.forEach { $0.polling = false }
above), an operation queue is overkill here. (Can be disregarded if we eliminate polling
, but please still remove the queue
variable in that case.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of polling
.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
} | ||
|
||
deinit { | ||
queue.shutdown() | ||
shutdown() | ||
} | ||
|
||
private func run() { | ||
DispatchQueue.global().async { [weak self] in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the thread-spawning pattern now used in e.g.
var threadLabel = "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Sources/SwiftGRPC/Core/Channel.swift
Outdated
} | ||
|
||
private func run() { | ||
DispatchQueue.global().async { [weak self] in | ||
guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return } | ||
guard let `self` = self else { return } | ||
|
||
while self.polling { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace self.polling
with (lock.synchronize { self.polling })
. (Can be disregarded if we get rid of polling
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of polling
.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
if self.lastState == self.state { | ||
self.callbacks.forEach({ $0(newState) }) | ||
} | ||
self.callback(newState) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to handle queue shutdown at this point as well and exit the loop in that case. FYI, this loop retains self
, which means that deinit
(and thus shutdown
) will never get called by itself — but shutdown will be indirectly called by Channel.deinit
, so we should be fine here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might miss something here, but I don't think this loop retains self
. It's been weakened.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
if event.success == 1 || self.tryToConnect { | ||
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0)) | ||
if event.success == 1 { | ||
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that this should always try to connect? This would make the connectivity observer reconnect all the time, even though the actual channel is currently not used by the client code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I missed that one.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
|
||
defer { self.tryToConnect = false } | ||
|
||
guard let underlyingState = self.lastState.underlyingState else { return } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log an error in this case, as this would be very unexpected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Sources/SwiftGRPC/Core/Channel.swift
Outdated
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next() | ||
self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") | ||
self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State") | ||
self.callback = callback | ||
self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would add a currentState
argument to init
and use the result of that here. The initializer call in Channel
would become
let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: self.connectivityState(), callback: callback)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Sources/SwiftGRPC/Core/Channel.swift
Outdated
var polling: Bool = false { | ||
didSet { | ||
if polling == true && oldValue == false { | ||
run() | ||
queue.addOperation { [weak self] in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me like polling
will only set to true
once, right after initializing. So how about calling self.run()
at the end of init()
, and make shutdown
available to Channel.deinit
instead? In that case, we might even get by without a polling
variable altogether if you add support for handling shutdown events in the spinloop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of polling
Sources/SwiftGRPC/Core/Channel.swift
Outdated
completionQueue.run() // start a loop that watches the channel's completion queue | ||
} | ||
|
||
deinit { | ||
connectivityObservers.forEach { $0.polling = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either use a lock for setting polling
, or use $0.shutdown()
(see the other comments).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of polling
.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
public func subscribe(callback: @escaping (ConnectivityState) -> ()) { | ||
let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback) | ||
observer.polling = true | ||
connectivityObservers.append(observer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory there could be race conditions here of subscribe
were to be called from different threads, but I think we can ignore that possibility here.
You're right and I got rid of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much better, thank you! Just a few more smaller things.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
@@ -73,7 +73,6 @@ public class Channel { | |||
} | |||
|
|||
deinit { | |||
connectivityObservers.forEach { $0.polling = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep this line (see below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I think this has not been resolved yet (1).
Sources/SwiftGRPC/Core/Channel.swift
Outdated
} | ||
|
||
init(underlyingChannel: UnsafeMutableRawPointer, callback: @escaping (ConnectivityState) -> ()) { | ||
init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (ConnectivityState) -> Void
Sources/SwiftGRPC/Core/Channel.swift
Outdated
break spinloop | ||
case .queueTimeout: | ||
continue spinloop | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print an error here as well and/or continue
(see above)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
guard let underlyingState = self.lastState.underlyingState else { return } | ||
spinloop: while true { | ||
guard let underlyingState = self.lastState.underlyingState else { | ||
print("Couldn't retrieve `underlyingState`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I prefer to also include which class printed the error message. How about
print("ERROR: `Channel.ConnectivityObserver.lastState.underlyingState` nil. This should never happen!")
Sources/SwiftGRPC/Core/Channel.swift
Outdated
case .queueShutdown: | ||
break spinloop | ||
case .queueTimeout: | ||
continue spinloop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could have just break
without spinloop
here to exit the switch; up to you.
You might also get rid of the spinloop:
loop label by replacing break spinloop
above with return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
|
||
self.lastState = newState | ||
case .queueShutdown: | ||
break spinloop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call the observer callback with a nil
argument once we exit the loop, to indicate that no further calls are expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say no. If I subscribe to get connectivity state changes, I'd assume I will get a callback only when there is a connectivity state change. If the queue gets shut down, that means, somehow the channel has been deallocated and at that point, I would not expect to get a connectivity state change, especially since it's most likely I'm the one initiating this event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also dislike introducing optionals where they're not really needed.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
DispatchQueue.global().async { [weak self] in | ||
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread") | ||
|
||
spinloopThreadQueue.async { [weak self] in | ||
guard let `self` = self else { return } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the line that retains self
by acquiring a strong reference to it. It does not make sense to try to get rid of it; instead avoid the [weak self] in guard let self = self
altogether and keep the forEach { $0.shutdown() }
line above. So I think at the moment your code still leaks threads.
To be honest, I'd like to have a test that spins up 100 observers to ensure all their spinloop threads get spun down once the channel closes (i.e. ensure no leaks are happening), but I can understand if you don't want to add one (should be fairly easy, though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I think this has not been resolved yet (2).
Sources/SwiftGRPC/Core/Channel.swift
Outdated
let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback) | ||
observer.polling = true | ||
public func subscribe(callback: @escaping (ConnectivityState) -> Void) { | ||
let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ultra-nit: observer
could be inlined now.
Sources/SwiftGRPC/Core/Channel.swift
Outdated
} | ||
} | ||
} | ||
} | ||
|
||
private func shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have kept this to call in Channel.shutdown
(see above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I think this has not been resolved yet (3).
Sources/SwiftGRPC/Core/Channel.swift
Outdated
default: | ||
break spinloop | ||
print("Event's completion type is `unknown`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also improve the formulation of this error as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I'm not fan of having print
statements all over the place. Especially in libraries. I'd rather not have these in here. I'm going to remove them and feel free to put them back in another PR if you really want them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. For what it's worth, I have a task in #189 to add a proper logging class. At that point, it should become much easier to properly detect and notice such errors (and possibly investigate why they happen).
@MrMage Just in case you missed the last commit. |
Sources/SwiftGRPC/Core/Channel.swift
Outdated
@@ -124,41 +124,43 @@ private extension Channel { | |||
} | |||
|
|||
deinit { | |||
completionQueue.shutdown() | |||
shutdown() | |||
} | |||
|
|||
private func run() { | |||
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread") | |||
|
|||
spinloopThreadQueue.async { [weak self] in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use [weak self]
here, as mentioned before. Shutdowns are initiated by Channel
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but please remove [weak self]
in the block, as it doesn't add value.
Thanks for all the changes!
Done :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! This looks good to me. For reference, I note that https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md says that "Since state changes can be rapid and race with any such notification, the notification should just inform the user that some state change has happened, leaving it to the user to poll the channel for the current state." The Swift callback passes state, but that state is from an immediate call to grpc_channel_check_connectivity_state, so it seems as current as state that might be obtained from a call made within the callback.
Implementation for #186
The goal was to copy the interface that the python client exposes.