-
Notifications
You must be signed in to change notification settings - Fork 699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates to KV Watcher. #900
Conversation
1. If the underlying stream was very busy pending might not go to zero, so snapshot and mark initial state done when received >= init pending or delta == 0 2. When a watcher was cancelled with a context we would not signal to allow and range w.Updates() calls to exit. 3. General flow control checks were timing sensitive after the initial checks, needed to be 2x HB interval always, not just beginning. Signed-off-by: Derek Collison <derek@nats.io>
Signed-off-by: Derek Collison <derek@nats.io>
3987a74
to
e6ee279
Compare
nats.go
Outdated
@@ -3944,6 +3946,13 @@ func (nc *Conn) removeSub(s *Subscription) { | |||
if s.pCond != nil { | |||
s.pCond.Broadcast() | |||
} | |||
|
|||
// Check for watchers. | |||
if jsi != nil && jsi.w != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not enough to guarantee that waitForMsgs() is not invoking the message callback at this time, or about to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I am reworking this as we speak since the large KINE test failed on send to closed chan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I reworked it, please take another look.
Signed-off-by: Derek Collison <derek@nats.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This is related to #900 that introduced the change of discard policy if connecting to a server v2.7.2+. The CreateKeyValue() API calls AddStream(). If the stream already exists and the configuration is identical, the call succeeds (idempotent). However, since we now try to set the discard policy to "new" when connecting to a server v2.7.2+, the call will fail if the KV store already existed (CreateKeyValue() was called with a v2.7.1 server and stream was created with DiscardOld). The approach here is that if AddStream() fails with an "already in use" error, then we will lookup the stream info, and if we detect that the configuration is same (except for the discard policy), then we call UpdateStream() with the new discard policy. The problematic part is that the client side configuration does not set some of the fields (or their value is 0), but then the server sets either some defaults (like Duplicates set to 2min) or replaces 0 to -1. So the info we get back and the config we have need to be tweaked before being compared. This is really hacky and prone to break if server were to change some defaults. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This is related to #900 that introduced the change of discard policy if connecting to a server v2.7.2+. The CreateKeyValue() API calls AddStream(). If the stream already exists and the configuration is identical, the call succeeds (idempotent). However, since we now try to set the discard policy to "new" when connecting to a server v2.7.2+, the call will fail if the KV store already existed (CreateKeyValue() was called with a v2.7.1 server and stream was created with DiscardOld). The approach here is that if AddStream() fails with an "already in use" error, then we will lookup the stream info, and if we detect that the configuration is same (except for the discard policy), then we call UpdateStream() with the new discard policy. The problematic part is that the client side configuration does not set some of the fields (or their value is 0), but then the server sets either some defaults (like Duplicates set to 2min) or replaces 0 to -1. So the info we get back and the config we have need to be tweaked before being compared. This is really hacky and prone to break if server were to change some defaults. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Signed-off-by: Derek Collison derek@nats.io