Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

umqtt.robust: Fix check_msg blocking after reconnect #509

Conversation

ian-llewellyn
Copy link
Contributor

@ian-llewellyn ian-llewellyn commented Jul 18, 2022

After reconnect(), MQTTClient.socket is blocking by default. This
commit aims to supplement that behaviour at times when check_msg()
sets the socket to non-blocking. It may fix errors reported in #102 and
#192.

This fix:

  • avoids using an additional instance attribute to record the intended
    state of the socket (Edit: this would involve changes to umqtt.simple which should 'know' nothing about robustness)
  • only adds two additional lines of code to one file in the codebase (Edit: a new approach was taken based on feedback: it's now 8 additional code lines)
  • depends on socket's __str__() method to retrieve the current timeout
    value: <socket state=0 timeout=-1 incoming=0 off=0> - not ideal
  • (Edit: overrides umqtt.simple's check_msg() method in order to sock.setblocking(False) before all calls to wait_msg(), even after reconnect())

Edit: An additional note: This fix calls super().wait_msg() as opposed to self.wait_msg() to avoid nested reconnect() retries which would simply reintroduce the original behaviour.

@ian-llewellyn
Copy link
Contributor Author

I have since read @pfalcon's comment in PR #117:

guarantee that check_msg() return immediately and can be used in main loops doing something else (otherwise, check_msg() may block your main loop indefinitely)

This fix would contradict that aim. However, publish() could be considered similarly and it is allowed to block the main loop indefinitely.

@andrewleech
Copy link
Contributor

Hi, that's interesting that the socket timeout is lost after reconnect, I feel that's perhaps an upstream bug that should be addressed.

In the mean time, re-setting the timeout certainly seems appropriate.

Personally I think adding a class attribute to hold the timeout would be better than parsing the str(), the parsing is less likely to work across multiple micropython versions and fairly expensive in terms of CPU cycles really.
Cashing one more float in a class is quite a small amount of ram - through I applaud your efforts at minimising code size / change.

@andrewleech
Copy link
Contributor

I have since read @pfalcon's comment in PR #117:

guarantee that check_msg() return immediately and can be used in main loops doing something else (otherwise, check_msg() may block your main loop indefinitely)

This fix would contradict that aim.

I can't see how your change here has affected any behaviour for check_msg? You're just adding to wait_msg but even then there's no new delays?

@ian-llewellyn
Copy link
Contributor Author

ian-llewellyn commented Jul 19, 2022

Many thanks for the feedback @andrewleech. I've had another look at the code with your comments in mind.

The setblocking() calls are both executed in the umqtt.simple module which knows nothing about "robustness"; storing additional variables in that class, for this purpose seems... improper! Agreed though, the str() parsing approach isn't great.

I looked at a couple of other ways to take care of it and settled on this idea: override check_msg() in umqtt.robust, implement the retry loop in there with self.sock.setblocking(False) as the first call and then return super().wait_msg() in the try: except: (same pattern as the overridden wait_msg method). That eliminates the parsing as per my previous attempt, confines the changes to umqtt.robust, continues the pattern of overriding methods to make them more robust and negates the need to add class variables to 'track' the desired socket state. It's a 7 line override excluding the def line - how do you think that stacks up RAM-usage-wise to the alternative of storing a class variable in umqtt.simple?

My last question is this: Is it best to open a completely new PR for this since the approach is different, or squash this on top of the code you've already seen and force push?

In reply to your second comment:

I can't see how your change here has affected any behaviour for check_msg? You're just adding to wait_msg but even then there's no new delays?

Yes and no. Without my change, check_msg is unintentionally converted to wait_msg after a reconnect - so that's a bug resulting in check_msg not returning to the main loop immediately. My change intends to address that accidental conversion, but it too will not return to the main loop if it cannot reconnect. (Edit: reconnect() is not in a try/except, so it will return control to the main loop). I think that's okay though: As per the note in robust's README.rst:

any realistic application would subclass umqtt.robust.MQTTClient class and override delay() and reconnect() methods to suit particular usage scenario.

@andrewleech
Copy link
Contributor

The setblocking() calls are both executed in the umqtt.simple module which knows nothing about "robustness"; storing additional variables in that class, for this purpose seems... improper! Agreed though, the str() parsing approach isn't great.

I looked at a couple of other ways to take care of it and settled on this idea: override check_msg() in umqtt.robust, implement the retry loop in there with self.sock.setblocking(False) as the first call and then return super().wait_msg() in the try: except: (same pattern as the overridden wait_msg method). That eliminates the parsing as per my previous attempt, confines the changes to umqtt.robust, continues the pattern of overriding methods to make them more robust and negates the need to add class variables to 'track' the desired socket state. It's a 7 line override excluding the def line - how do you think that stacks up RAM-usage-wise to the alternative of storing a class variable in umqtt.simple?

Yes and no. Without my change, check_msg is unintentionally converted to wait_msg after a reconnect - so that's a bug resulting in check_msg not returning to the main loop immediately. My change intends to address that accidental conversion, but it too will not return to the main loop if it cannot reconnect. (Edit: reconnect() is not in a try/except, so it will return control to the main loop). I think that's okay though: As per the note in robust's README.rst:

any realistic application would subclass umqtt.robust.MQTTClient class and override delay() and reconnect() methods to suit particular usage scenario.

Ok, I'm not familiar with the imqtt library myself so lets map out how it's supposed to work.
In simple.py:

  • wait_msg() it relies on previously set blocking behavior to read the first byte, turns on blocking, then reads the rest of the packet if there was a byte or returns None if there wasn't.
  • check_msg() explicitely turns off blocking then calls wait_msg() which will immediately return with None if there's nothing already waiting.

An important point here is that by default, TCP sockets are placed in a blocking mode. So on startup the first call to wait_msg() will already be in blocking mode.

Moving on to robust.py:

  • wait_msg() from simple is wrapped to run in a loop, reconnecting if there was a failure an retrying the underlying wait_msg().
  • The problem you're addressing is that reconnect will re-instate the blocking behaviour and loop forever even when called from check_msg().
  • wait_msg() probably should have a timeout on the loop to not block forever, but that's a separate discussion.

I understand the root problem now :-)
I think this newer method has merit, however still isn't really correct to the intended behaviour of the functions as you suggest - I'll make some review notes in the code.

My last question is this: Is it best to open a completely new PR for this since the approach is different, or squash this on top of the code you've already seen and force push?

Sorry I didn't get back to you in time, but yes I commonly replace changes with completely new ones and squash / force-push to github. These github PR's do let you still view earlier changesets so it's still possible to review them.

Comment on lines 46 to 53
def check_msg(self):
while 1:
self.sock.setblocking(False)
try:
return super().wait_msg()
except OSError as e:
self.log(False, e)
self.reconnect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be updated to add a new optional arg to def wait_msg(self, blocking=True) and if blocking is False, then

except OSError as e:
   if not blocking:
       raise

which would mean the error would be propogated back up to check_msg() to handle.

Suggested change
def check_msg(self):
while 1:
self.sock.setblocking(False)
try:
return super().wait_msg()
except OSError as e:
self.log(False, e)
self.reconnect()
def check_msg(self):
self.sock.setblocking(False)
try:
return super().wait_msg()
except OSError as e:
self.log(False, e)
self.reconnect()
return None

I also don't think check_msg() should be a loop, there's nothing wrong with checking once (in non blocking state), still reconnect on error, but just exit straight away then. This still preserves the intention of the functions I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a right answer to this, it depends to some degree on the application.

When calling a robust check_msg(), my assumption would have been that I can rely on this (more so than umqtt.simple) to deliver me a message if one exists - I believe this is the original author's intention too since check_msg() was not overridden in robust and calls back to the overridden wait_msg() which does try to be more reliable. I'm convinced check_msg() should loop (bear in mind, if there's an exception in reconnect(), it will be raised, so the only occasion where we don't return reasonably quickly to check_msg()'s caller is if reconnect() consistently works and super().wait_msg() consistently fails - a highly unusual situation I'd say).

To help us reach consensus, is it worth considering two applications:

  1. check_msg() is periodically called on a main loop or from within an async co-routine
    In this case, I think your suggestion would work reasonably well; in the event of a reconnect(), messages would be delayed only by the length of a loop. My suggested implementation does spend time trying a little harder, but delivers a message ASAP.
  2. check_msg() is called once having solicited a response from other nodes connected to the MQTT broker (possibly after a short wait for responses)
    In this case, returning from check_msg() having knowingly failed seems inappropriate to me and would need to be clearly stated in its usage documentation "check_msg() should be called at least twice (in what hard-to-detect circumstances?)" Again, my implementation does spend time trying a little harder, but delivers a message first time around if available.

Perhaps it is this specific point that is most concerning, "spend time trying a little harder". That's fair critique if returning quickly takes precedence over being robust for check_msg(). That said, if that's the route we go down, I believe we need to signal the failure in some way, possibly by returning False instead of None on your last line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you make some very good points in that there's no one definitive right answer.

Looking through the suggestions, I now feel like check_msg should retry just once if needed, ie call wait_msg, on exception call reconnect then call wait_msg again.

If it fails again it should still just return as it means the server is probably down.

I definitely believe check_msg should not continue to loop on failure because that will be blocking and prevent the caller from logging, notifying user of failure etc, basically the same as a wait_msg if the server is down.

@ian-llewellyn
Copy link
Contributor Author

@andrewleech, I'm resurrecting our month old conversation! :-)

check_msg should not continue to loop

So a trivial code change brings your suggestion to fruition while also making check_msg() more 'configurable' for those who want it:

  • By default, check_msg will make a second attempt to check for a message if a reconnect is necessary. If unsuccessful, it will give up and return to the caller.
  • Calling check_msg(attempts=x) sets the number of attempts to be made.
  • Calling check_msg(attempts=-1) instructs check_msg not to return until the connection is re-established and the function is otherwise successful.

What do you think?

@andrewleech
Copy link
Contributor

@ian-llewellyn This does look quite good to me, nice cleanup. Keeps it all quite simple and I feel true-to-intention.

After `reconnect()`, MQTTClient.socket is blocking by default. This
commit aims to supplement that behaviour at times when `check_msg()`
sets the socket to non-blocking. It may fix errors reported in micropython#102 and

This fix:
* avoids using an additional instance attribute to record the intended
  state of the socket
* only adds two additional lines of code to one file in the codebase
* depends on socket's `__str__()` method to retrieve the current timeout
  value: `<socket state=0 timeout=-1 incoming=0 off=0>` - not ideal
@ian-llewellyn ian-llewellyn force-pushed the fix_umqtt_robust_check_msg_blocking_after_reconnect branch from 28cdd08 to bcf6d97 Compare September 27, 2022 09:43
@ian-llewellyn
Copy link
Contributor Author

Thank for the positive discourse @andrewleech. I saw that master has moved on a bit since my initial PR, so I've rebased, updated manifest.py and force pushed. Hopefully this is now ready for a final review and merge. 🤞

@andrewleech
Copy link
Contributor

Looks good to me, I'll have to leave it to @jimmo for a final review.

@dpgeorge
Copy link
Member

dpgeorge commented Oct 4, 2022

Thanks for the contribution. There is definitely a bug here, check_msg() shouldn't block.

The fix here looks good. Rebased and merged in 4dc2d5e

@dpgeorge dpgeorge closed this Oct 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants