-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improved recovery on durables and ordered #865
Conversation
# Conflicts: # src/NATS.Client/Connection.cs
# Conflicts: # README.md
} | ||
|
||
return $"DUR{seconds}s{millis}ms"; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added stringify for better debugging
@@ -196,6 +196,7 @@ public static class JetStreamConstants | |||
public const string ExceededMaxRequestMaxBytes = "Exceeded MaxRequestMaxBytes"; // 409 | |||
|
|||
public const string BatchCompleted = "Batch Completed"; // 409 informational | |||
public const string ServerShutdown = "Server Shutdown"; // 409 informational with headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I was here, this was a recent addition to the server
@@ -19,7 +19,7 @@ | |||
namespace NATS.Client.JetStream | |||
{ | |||
internal interface SimplifiedSubscriptionMaker { | |||
IJetStreamSubscription Subscribe(EventHandler<MsgHandlerEventArgs> handler = null); | |||
IJetStreamSubscription Subscribe(EventHandler<MsgHandlerEventArgs> handler , PullMessageManager optionalPmm, long? optionalInactiveThreshold); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intern interface change.
ConsumerName = ci.Name; | ||
unorderedBindPso = PullSubscribeOptions.BindTo(streamCtx.StreamName, ci.Name); | ||
unorderedBindPso = PullSubscribeOptions.FastBindTo(streamCtx.StreamName, ci.Name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FastBind is essentially an unsafe, you are on your own bind, which is fine for simplification since it's used after already having the consumer context.
@@ -73,30 +73,31 @@ internal ConsumerContext(StreamContext sc, OrderedConsumerConfiguration config) | |||
.WithReplayPolicy(config.ReplayPolicy) | |||
.WithHeadersOnly(config.HeadersOnly) | |||
.Build(); | |||
subscribeSubject = originalOrderedCc.FilterSubject; | |||
subscribeSubject = Validator.ValidateSubject(originalOrderedCc.FilterSubject, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed validation, otherwise would have failed slow (at the server)
PullSubscribeOptions pso; | ||
if (ordered) { | ||
if (lastConsumer != null) { | ||
highestSeq = Math.Max(highestSeq, lastConsumer.pmm.LastStreamSeq); | ||
} | ||
ConsumerConfiguration cc = lastConsumer == null | ||
? originalOrderedCc | ||
: streamCtx.js.NextOrderedConsumerConfiguration(originalOrderedCc, highestSeq, null); | ||
: streamCtx.js.ConsumerConfigurationForOrdered(originalOrderedCc, highestSeq, null, null, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name and signature had changed.
public IJetStreamSubscription Subscribe(EventHandler<MsgHandlerEventArgs> handler = null) { | ||
|
||
public IJetStreamSubscription Subscribe(EventHandler<MsgHandlerEventArgs> messageHandler, PullMessageManager optionalPmm, long? optionalInactiveThreshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interface changed
if (handler == null) { | ||
return (JetStreamPullSubscription)streamCtx.js.PullSubscribe(subscribeSubject, pso); | ||
if (messageHandler == null) { | ||
return (JetStreamPullSubscription) streamCtx.js.CreateSubscription(subscribeSubject, null, pso, null, null, false, optionalPmm); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to manually make this subscription because of passing the extra optionalPmm
} | ||
return (JetStreamPullAsyncSubscription)streamCtx.js.PullSubscribeAsync(subscribeSubject, handler, pso); | ||
|
||
return (JetStreamPullAsyncSubscription)streamCtx.js.CreateSubscription(subscribeSubject, null, pso, null, messageHandler, false, optionalPmm); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this one manually also
@@ -130,38 +134,58 @@ public ConsumerInfo GetCachedConsumerInfo() | |||
|
|||
public Msg Next(int maxWaitMillis = DefaultExpiresInMillis) | |||
{ | |||
lock (stateLock) | |||
if (maxWaitMillis < MinExpiresMills) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate before locking
} | ||
catch (NATSTimeoutException) | ||
catch (Exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any exception, cleanup and return null. Simplification is different as I try to return null instead of throwing the timeout
private readonly string pullSubject; | ||
private Stopwatch sw; | ||
private long startTicks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the java version I moved to nanos just to have a little finer control of the timing. Don't have nanos here, but have ticks, which is finer than millis.
long expiresInMillis = fetchConsumeOptions.ExpiresInMillis; | ||
maxWaitTicks = expiresInMillis * TimeSpan.TicksPerMillisecond; | ||
|
||
long inactiveThreshold = expiresInMillis * 110 / 100; // ten % longer than the wait |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want the threshold to be longer than the wait so it doesn't expire in the middle, even though it shouldn't because there is a pull active.
/// </summary> | ||
/// <returns>the consumer name</returns> | ||
string ConsumerName { get; } | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved down into base interface since multiple subclasses now do this. Before, this was the only class, ordered did not provide a name for instance but now does which is tracked.
/// backed by Jetstream streams and consumers, using the same connection and JetStreamOptions as the management. | ||
/// </summary> | ||
/// <returns>The JetStream Context</returns> | ||
IJetStream GetJetStreamContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was tired of getting the JetStream Context from the connection when I could just get it from the JSM. Not really necessary, I just wanted to do it and now use it all the time.
@@ -21,7 +21,7 @@ internal class IterableConsumer : MessageConsumer, IIterableConsumer | |||
internal IterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, | |||
ConsumeOptions consumeOptions, | |||
ConsumerInfo cachedConsumerInfo) | |||
: base(subscriptionMaker, consumeOptions, cachedConsumerInfo, null) {} | |||
: base(subscriptionMaker, cachedConsumerInfo, consumeOptions, null) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed variable ordering to match java.
else | ||
{ | ||
mm = pmmInstance; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is to reuse a message manager since it would have state that I don't want to lose. Before it would always just make a new one
} | ||
else if (sub is JetStreamAbstractAsyncSubscription asyncSub) | ||
{ | ||
asyncSub.UpdateConsumer(ci.Name); | ||
asyncSub.SetConsumerName(ci.Name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming is hard.
@@ -554,71 +564,71 @@ public IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOpt | |||
{ | |||
subject = ValidateSubject(subject, false); | |||
ValidateNotNull(options, "Pull Subscribe Options"); | |||
return (IJetStreamPullSubscription) CreateSubscription(subject, null, options, null, null, false); | |||
return (IJetStreamPullSubscription) CreateSubscription(subject, null, options, null, null, false, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create subscription signature changed to accept a message manager. All these old api are just new instances of the subscription so don't have a message manager yet.
_consumerCreate290Available = jetStreamBase._consumerCreate290Available; | ||
_multipleSubjectFilter210Available = jetStreamBase._multipleSubjectFilter210Available; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the code to support getting a JetStream context from JetStream Management.
builder.WithInactiveThreshold(inactiveThreshold.Value); | ||
} | ||
|
||
return builder.Build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this builder logic changed to handle optional parameters.
thresholdMessages = bm - rePullMessages; | ||
thresholdBytes = bb == 0 ? int.MinValue : bb - rePullBytes; | ||
|
||
DoSub(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic changed - don't create the PullRequestOptions up front since they won't be exactly the same after a recovery.
catch (Exception) { | ||
SetupHbAlarmToTrigger(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heartbeat error, you never know why, just cleanup and re-subscribe. On an error cleaning up, set the alarm to trigger, which eventually ends up right back here, but in that time, the server or connection or stream or consumer might be fixed. Or might not, and it will just keep circling. But not blocking, the alarm is running on a separate thread/task
} | ||
catch (Exception) | ||
{ | ||
SetupHbAlarmToTrigger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception just try to start over later
try | ||
{ | ||
base.InitSub(subscriptionMaker.Subscribe(mh, pmm, null)); | ||
Repull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repull also handles the first time.
@@ -32,7 +32,7 @@ public abstract class MessageManager | |||
|
|||
internal ulong LastStreamSeq; | |||
internal ulong LastConsumerSeq; | |||
internal long LastMsgReceived; | |||
internal InterlockedLong LastMsgReceived; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interlocked b/c multiple threads may read the value.
internal override void HandleHeartbeatError() | ||
{ | ||
base.HandleHeartbeatError(); | ||
HandleErrorCondition(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracted so could be re-used from other places.
} | ||
NATSException ne = e is NATSException ? (NATSException)e : new NATSException("Ordered Subscription Error", e); | ||
((Connection)Js.Conn).ScheduleErrorEvent(this, ne); | ||
InitOrResetHeartbeatTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, don't die on exceptions, just reset the timer, which will eventually alarm
|
||
private void TrackPending(int m, long b) | ||
private void TrackIncoming(int m, long b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better name, slightly tweaked impl.
if (pullManagerObserver != null) | ||
{ | ||
pullManagerObserver.HeartbeatError(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the incoming sequence is wrong, my current expectations are invalid. Since I stop tracking the message, the alarm will trigger and redo the sub.
@@ -21,14 +21,31 @@ public sealed class PullSubscribeOptions : SubscribeOptions | |||
private PullSubscribeOptions(ISubscribeOptionsBuilder builder) : base(builder, true, null, null) {} | |||
|
|||
/// <summary> | |||
/// Create PushSubscribeOptions where you are binding to | |||
/// Create PullSubscribeOptions where you are binding to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed copy paste
@@ -61,7 +61,7 @@ protected override bool BeforeChannelAddCheck(Msg msg) | |||
{ | |||
if (Hb) | |||
{ | |||
MessageReceived(); // only need to track when heartbeats are expected | |||
UpdateLastMessageReceived(); // only need to track when heartbeats are expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming
", " + ConsumerConfiguration.ToJsonNode().ToString() + | ||
'}'; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for debugging.
|
||
namespace NATSExamples | ||
{ | ||
public class ChaosCommandLine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole Chaos app is what I built to test all these changes against real servers. It's just easier to simulate failure against a real cluster to work through and validate recovery and now the app exists as sample code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
(only nit is if you want to, and if I understood correctly, in tests you should use Stopwatch instead of DateTime.Now to measure how long an operation takes.)
Improved recovery for durables and ordered, including push, pull and simplification.