Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement resubscribing to all subscribed topics on reconnect #273

Open
vtolstov opened this issue Jun 23, 2019 · 8 comments
Open

implement resubscribing to all subscribed topics on reconnect #273

vtolstov opened this issue Jun 23, 2019 · 8 comments

Comments

@vtolstov
Copy link

as i see after reconnecting to nats-streaming-server we need to resubscribe to subscribed topics after reconnect, does this right?
If yes - i think that proper place for this - stan.go client, because in case of handling this is application - i need to store topic and subscription details in all applications that use nats-streaming.

@aricart
Copy link
Member

aricart commented Jun 24, 2019

@vtolstov I want to clarify a few things first.

  • When your client connects you provide a client name. The server tracks this client name, to ensure that the client is not connected more than once on the server.

  • If you simply create subscriptions, and you disconnect, the server doesn't store any client subscription state. So when you reconnect you'll have to specify connection options to start looking at the streaming log where you left off.

  • If you create a durable subscription, the server will remember your client name and the durable name, so it will resume subscriptions (when you re-issue a Subscribe) where you left off. You do have to remember the stream's subject and your durable name

More precisely, you have to resubscribe when the connection to NATS is closed (the closed handler is called) or the connection to the STAN server is lost (the connection close handler is called). While the server can remember where you were on the stream, the client cannot 'resume' where it was without resubscribing and re-setting up inboxes where it can receive new messages.

With that said, if the NATS client connection is disrupted temporarily, on reconnect to the NATS client will resend all current subscriptions to the NATS server. Internally the streaming server will be sending PINGs to the streaming server. If the number of missing PINGs is exceeded the stan server will deem the connection permanently lost, and call your connection lost handler.

So in short, yes, you do have to remember:

  • Your client id
  • Subjects to all the streams (and any associated durable name)

If not using durables, then likely your application is very specific about what it wants to see, so you'll already know what options to provide. Likely in many situations, a durable queue subscription is your best bet. As you can simply start a client and your subscription is load balanced by all the members in the queue group. If one client goes down from the perspective of the server, others will be able to continue the processing the queue. The old client can rejoin once the connection is restored, and handle requests that have not been acknowledged.

@kozlovic
Copy link
Member

In Streaming there are two level of connections if you will: the low level NATS connection that allows to send/receive messages, and the streaming (or STAN) connection, which is more of a logical connection to a streaming server.

The low-level NATS connection may break and reconnect and your streaming application may not even know and care about it. It is possible that while the NATS connection is broken your application cannot receive messages, hence the AckWait() that would allow the server to resend (since the server itself does not know if your application received the message or not, hence need of ACKs). Or a publish call may timeout since it did not receive a confirmation from the server while the TCP connection was broken.

However, if the Streaming connection is reported lost, then the server has removed the connection and its non durable subscriptions. As of now, this is a terminal event in the streaming library and no reconnection logic is done for the user.

For non-queue subscriptions, it would be possible for the streaming library to try to reconnect and re-create non-durable subscriptions based on the lowest ack'ed sequence, that is, suppose your sub received messages 1 to 10 but using manual acknowledgment mode ack'ed 1, 2, 3, 4, 6 and 8, it would create the sub starting at 5 which is the first non-ack'ed message, but then your application would receive duplicates and since this would be a brand new subscription from the server perspective, they all would be marked as non redelivered. The other approach would be to simply re-create based on the last sequence seen by the library (which in above example would be 10).

For queue subscriptions, the problem is that there are expected gaps in sequence if there are more than one member for the same group. However, if a queue group is not empty, then the start sequence is ignored by the server and that member would start to receive new or redelivered messages for the group, which is ok, but if it was the last member and is now re-created, the start sequence will take effect and that start sequence would be potentially way off compare to what the group had received as a whole. Suppose that there were several members and one member received message 1, 3, and 5 and then was somehow partitioned, while other members received messages 6 to 100. Then, those members went away and the first member "reconnects", the start sequence would be 5+1 (since it received last 5), which means that this member would process all other messages that the group had already processed. This is still "valid" since streaming offers at-least-once semantic.

These are the reason as to why, as of now, the streaming library is not handling reconnection. This could change in the future though.

@vtolstov
Copy link
Author

thank you for detailed answers. I want to modify go-micro stan broker plugin (i'm maintain it and nats broker too). What i need to do if i want to start all subscription with last received?

@kozlovic
Copy link
Member

Sorry for this long delay. If subscriptions are all durables, then you don't have to worry about start position since it would be ignored by the server anyway. If you have non durable subscriptions, you would need to have to keep track of the last received in your message callback and use that as the start sequence when recreating the subscriptions on reconnect.
We may consider doing this in the library but there is no immediate plans.

@chaitanyav
Copy link

@kozlovic Thanks for the detailed explanation, i understood now what it means when i get the message 'stan connection closed'.

@alifpay
Copy link

alifpay commented Jul 30, 2020

Hi

Is this correct for reconnect?

func reConn() {
	var err error
	sc, err = stan.Connect("testClient", "durable2", stan.NatsConn(nc),
		stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
			log.Printf("Connection lost, reason: %v , reconnecting \n", reason)
			time.Sleep(15 * time.Second)
			reConn()
		}))
	if err != nil {
		log.Println("stan.Connect", err)
	}

	msgHandler := func(msg *stan.Msg) {
		saveMsg(msg.Data)
	}
	_, err = sc.QueueSubscribe("test.pay", "test", msgHandler, stan.DurableName("nats-test-1"))
	if err != nil {
		sc.Close()
		log.Println("QueueSubscribe", err)
	}
}

@kozlovic
Copy link
Member

That is a start. You ave to handle probably if anything in the connect/create of subscription process fails, but nothing different that on initial setup, just that I don't see here where your application really handle the error (you print, but continue). For instance if the connect fails, you still attempt to create the subscription, which will fail, and again, you print, but don't actually fail or retry, etc..
The other thing you need to be aware is it looks like sc is global, so if you are using it outside of this function, then you may need some protection since the connection handler is invoked from a library go routine and so you may have then concurrent access to sc.

@alifpay
Copy link

alifpay commented Aug 3, 2020

yes, you are right, I declared sc as global,
Thank you, I will put protection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants