-
-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: queue never gets released when is full #94
Conversation
CI is failing and the change is likely not correct. I would recommend to add a new unit test to demonstrate the problem first. |
@mcollina Sure I'm working on it |
@mcollina Done |
@mcollina I also think we can drop |
@mcollina ready for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fix seems necessary if an user is not calling the given callback, which is a bug in the user not in mqemitter.
|
||
for (let i = 0; i < 9; i++) { | ||
e._messageQueue.push({ topic: 'hello 1' }) | ||
e._messageCallbacks.push(onSent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write a test without using private properties. I still do not understand what this is doing and my understanding of it is that it is not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to:
moscajs/aedes#553
and maybe moscajs/aedes#666
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not able to reproduccec the error without using private fields, BTW this is a safe approach to ensure the queue is always released when full
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the point of the limit. Once it is full, wait for one of the slots to be free.
This is equivalent to disabling the limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the point of the limit. Once it is full, wait for one of the slots to be free.
I know that and it's still working in that way, but for some reasons seems it could happen that the queue gets full and it's never released, for this reason I have added the doing
flag that handles this specific case, if doing
is true it means that the fastparallel is doing his job and will automatically call a release when done (this will ensure the concurrency is always respected as before), when doing is false and we add a message to the queue that message will never be processed as there is no parallel job active that once finished can process new messages and all the process will be stucked forever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend to focus on writing a test cases where this can happen without tinkering with internal state. Once we understand why this can happen we can identify if a fix is needed in here after all. My understanding is that it is a bug in Aedes not calling the callback on some weird case.
This PR is equivalent to removing the concurrency option. I'm not sure I agree with that change, but maybe behind an option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that it is a bug in Aedes not calling the callback on some weird case.
Yeah but it's 2 days I'm checking aedes code to find out where this happens without success, if you have some minutes to take a quick look at it it would be much appreciated Matteo 🙏🏼
This PR is equivalent to removing the concurrency option
I agree on the point that it would be better to have a test case that reproduce this without changing private vars, BTW as I explained in my previous comment this PR doesn't change any functionality, if so it would break the concurrency test that is actually passing (the reason why my first commit was breaking CI). This just introduces a second guard against stucked queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
No description provided.